auxon_sdk/plugin_utils/
mutation.rs

1#![allow(unused)]
2
3use tracing::{debug, error, info, trace, warn};
4use url::Url;
5use uuid::Uuid;
6
7use crate::{
8    api::{AttrKey, AttrVal, TimelineId},
9    auth_token::AuthToken,
10    mutation_plane::{
11        protocol::{LeafwardsMessage, RootwardsMessage, MUTATION_PROTOCOL_VERSION},
12        types::{MutationId, MutatorId, ParticipantId},
13    },
14    mutation_plane_client::parent_connection::{
15        CommsError, MutationParentClientInitializationError, MutationParentConnection,
16    },
17    mutator_protocol::{
18        actuator::MutatorActuator,
19        descriptor::{
20            owned::{
21                MutatorLayer, MutatorOperation, MutatorStatefulness, OrganizationCustomMetadata,
22                OwnedMutatorDescriptor,
23            },
24            MutatorDescriptor,
25        },
26        mutator::ActuatorDescriptor,
27    },
28};
29use std::{
30    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
31    time::Duration,
32};
33
34pub trait Mutator {
35    fn id(&self) -> MutatorId;
36    fn descriptor(&self) -> OwnedMutatorDescriptor;
37
38    /// Return true on success, false on failure
39    fn inject(&mut self, mutation_id: MutationId, params: BTreeMap<String, AttrVal>) -> bool;
40    fn clear_mutation(&mut self, mutation_id: &MutationId);
41    fn reset(&mut self);
42}
43
44pub struct MutatorHost {
45    participant_id: ParticipantId,
46    pub mutation_conn: MutationParentConnection,
47    mutators: BTreeMap<MutatorId, Box<dyn Mutator + Send>>,
48    active_mutations: HashMap<MutatorId, HashSet<MutationId>>,
49
50    ingest: Option<super::ingest::Client>,
51    ingest_ordering: u128,
52    log_comms: bool,
53    log_inject_and_clear: bool,
54}
55
56impl MutatorHost {
57    pub async fn connect_and_authenticate(
58        endpoint: &Url,
59        allow_insecure_tls: bool,
60        auth_token: AuthToken,
61        mut ingest: Option<super::ingest::Client>,
62    ) -> Result<MutatorHost, MutationParentClientInitializationError> {
63        debug!(%endpoint, %allow_insecure_tls, "Connecting to mutation plane");
64
65        let mut ingest_ordering = 0u128;
66        if let Some(i) = ingest.as_mut() {
67            let tl_id = TimelineId::allocate();
68            i.switch_timeline(tl_id).await.unwrap();
69            i.send_timeline_attrs("MutatorHost", []).await.unwrap();
70            let _ = i
71                .send_event("connecting_to_mutation_plane", ingest_ordering, [])
72                .await;
73            ingest_ordering += 1;
74        }
75
76        let mut mutation_conn =
77            MutationParentConnection::connect(endpoint, allow_insecure_tls).await?;
78
79        let mut_plane_pid = ParticipantId::allocate();
80        debug!(%mut_plane_pid, "Authenticating");
81        if let Some(i) = ingest.as_mut() {
82            let _ = i
83                .send_event(
84                    "authenticating",
85                    ingest_ordering,
86                    [("participant_id", mut_plane_pid.to_string().into())],
87                )
88                .await;
89            ingest_ordering += 1;
90        }
91
92        mutation_conn
93            .write_msg(&RootwardsMessage::ChildAuthAttempt {
94                child_participant_id: mut_plane_pid,
95                version: MUTATION_PROTOCOL_VERSION,
96                token: auth_token.as_ref().to_vec(),
97            })
98            .await;
99
100        debug!("Awaiting authentication response");
101        match mutation_conn.read_msg().await? {
102            LeafwardsMessage::ChildAuthOutcome {
103                child_participant_id,
104                version: _,
105                ok,
106                message,
107            } => {
108                if child_participant_id == mut_plane_pid {
109                    if ok {
110                        if let Some(i) = ingest.as_mut() {
111                            let _ = i.send_event("authenticated", ingest_ordering, []).await;
112                            ingest_ordering += 1;
113                        }
114                    } else {
115                        if let Some(i) = ingest.as_mut() {
116                            let _ = i
117                                .send_event(
118                                    "authentication_failed",
119                                    ingest_ordering,
120                                    message.as_ref().map(|s| ("message", AttrVal::from(s))),
121                                )
122                                .await;
123                        }
124                        return Err(
125                            MutationParentClientInitializationError::AuthenticationFailed(
126                                message.unwrap_or_else(|| "(no message)".to_string()),
127                            ),
128                        );
129                    }
130                } else {
131                    if let Some(i) = ingest.as_mut() {
132                        let _ = i
133                            .send_event(
134                                "authentication_failed",
135                                ingest_ordering,
136                                message.as_ref().map(|s| ("message", AttrVal::from(s))),
137                            )
138                            .await;
139                    }
140                    error!("Mutation plane auth outcome received for a different participant");
141                    return Err(MutationParentClientInitializationError::AuthWrongParticipant);
142                }
143            }
144            resp => {
145                error!(?resp, "Mutation plane unexpected auth response");
146                return Err(MutationParentClientInitializationError::UnexpectedAuthResponse);
147            }
148        }
149
150        debug!("Authenticated");
151        let mut conn = MutatorHost {
152            participant_id: mut_plane_pid,
153            mutation_conn,
154            mutators: Default::default(),
155            active_mutations: Default::default(),
156
157            ingest,
158            ingest_ordering: 0,
159            log_comms: true,
160            log_inject_and_clear: true,
161        };
162
163        conn.send_event("mutation_plane_connected", []).await;
164        Ok(conn)
165    }
166
167    /// Disable automatic logging of 'mutation communicated' events on the mutator timeline.
168    pub fn disable_mutation_communicated_logging(&mut self) {
169        self.log_comms = false;
170    }
171
172    /// Disable automatic logging of mutation injected/cleared events on the mutator host timeline.
173    ///
174    /// You might want to do this if you have arranged to log those events separately, on a timeline
175    /// that is more directly relevant to system operation.
176    pub fn disable_mutation_inject_and_clear_logging(&mut self) {
177        self.log_inject_and_clear = false;
178    }
179
180    pub async fn register_mutator(
181        &mut self,
182        mutator: Box<dyn Mutator + Send>,
183    ) -> Result<(), CommsError> {
184        let mutator_id = mutator.id();
185        let ann = mutator_announcement(self.participant_id, mutator.as_ref(), &mutator_id);
186        self.mutators.insert(mutator.id(), mutator);
187        self.mutation_conn.write_msg(&ann).await?;
188
189        self.send_event(
190            "modality.mutator.announced",
191            [("event.mutator.id", mutator_id_to_attr_val(mutator_id))],
192        )
193        .await;
194
195        Ok(())
196    }
197
198    pub async fn message_loop(&mut self) -> Result<(), CommsError> {
199        loop {
200            let msg = self.mutation_conn.read_msg().await?;
201            self.handle_message(msg).await;
202        }
203    }
204
205    pub async fn handle_message(&mut self, msg: LeafwardsMessage) {
206        trace!(?msg, "handle_message");
207        match msg {
208            LeafwardsMessage::RequestForMutatorAnnouncements {} => {
209                self.announce_all_mutators().await;
210            }
211
212            LeafwardsMessage::NewMutation {
213                mutator_id,
214                mutation_id,
215                maybe_trigger_mask: _,
216                params,
217            } => {
218                self.new_mutation(mutator_id, mutation_id, params).await;
219            }
220
221            LeafwardsMessage::ClearSingleMutation {
222                mutator_id,
223                mutation_id,
224                reset_if_active,
225            } => {
226                self.clear_single_mutation(mutator_id, mutation_id, reset_if_active)
227                    .await;
228            }
229
230            LeafwardsMessage::ClearMutationsForMutator {
231                mutator_id,
232                reset_if_active,
233            } => {
234                self.clear_mutations_for_mutator(mutator_id, reset_if_active)
235                    .await;
236            }
237
238            LeafwardsMessage::ClearMutations {} => {
239                self.clear_mutations().await;
240            }
241
242            LeafwardsMessage::UpdateTriggerState {
243                mutator_id: _,
244                mutation_id: _,
245                maybe_trigger_crdt: _,
246            } => {
247                // Not yet implemented
248            }
249
250            _ => {
251                warn!("Unexpected message");
252                self.send_event("unexpected_message", []).await;
253            }
254        }
255    }
256
257    async fn announce_all_mutators(&mut self) {
258        // We can't use the mutators iterator across an await point
259        let mut announces = Vec::with_capacity(self.mutators.len());
260        let mut mutator_ids = Vec::with_capacity(self.mutators.len());
261        for (mutator_id, mutator) in self.mutators.iter() {
262            let ann = mutator_announcement(self.participant_id, mutator.as_ref(), mutator_id);
263            announces.push(ann);
264            mutator_ids.push(*mutator_id);
265        }
266
267        for ann in announces.into_iter() {
268            if let Err(e) = self.mutation_conn.write_msg(&ann).await {
269                error!(
270                    err = &e as &dyn std::error::Error,
271                    "Failed to announce mutator; aborting batch announce"
272                );
273                // There's no reason to believe the next one would work
274                return;
275            }
276        }
277
278        for mutator_id in mutator_ids.into_iter() {
279            self.send_event(
280                "modality.mutator.announced",
281                [("event.mutator.id", mutator_id_to_attr_val(mutator_id))],
282            )
283            .await;
284        }
285    }
286
287    async fn clear_single_mutation(
288        &mut self,
289        mutator_id: MutatorId,
290        mutation_id: MutationId,
291        reset_if_active: bool,
292    ) {
293        self.send_event(
294            "modality.mutation.clear_communicated",
295            [
296                ("event.mutator.id", mutator_id_to_attr_val(mutator_id)),
297                ("event.mutation.id", mutation_id_to_attr_val(mutation_id)),
298                ("event.mutation.success", true.into()),
299            ],
300        )
301        .await;
302
303        let Some(mutator) = self.mutators.get_mut(&mutator_id) else {
304            warn!(
305                %mutator_id,
306                %mutation_id,
307                "Cannot clear mutation, mutator is not hosted by this client"
308            );
309            return;
310        };
311
312        let Some(active_mutation_ids_for_mutator) = self.active_mutations.get_mut(&mutator_id)
313        else {
314            warn!(
315                %mutator_id,
316                %mutation_id,
317                "Cannot clear mutation, no active mutations for mutator"
318            );
319            return;
320        };
321
322        if !active_mutation_ids_for_mutator.remove(&mutation_id) {
323            warn!(
324                %mutator_id,
325                %mutation_id,
326                "Cannot clear mutation, mutation not active"
327            );
328            return;
329        }
330
331        tracing::debug!(%mutator_id, %mutation_id, "Clearing mutation");
332
333        mutator.clear_mutation(&mutation_id);
334        if reset_if_active {
335            mutator.reset();
336        }
337    }
338
339    async fn clear_mutations_for_mutator(&mut self, mutator_id: MutatorId, reset_if_active: bool) {
340        let Some(mutator) = self.mutators.get_mut(&mutator_id) else {
341            warn!(
342                %mutator_id,
343                "Cannot clear mutations, mutator is not hosted by this client"
344            );
345            return;
346        };
347
348        let Some(active_mutation_ids_for_mutator) = self.active_mutations.remove(&mutator_id)
349        else {
350            warn!(
351                %mutator_id,
352                "Cannot clear mutations, no active mutations for mutator"
353            );
354            return;
355        };
356
357        let mut cleared_mutations = vec![];
358        for mutation_id in active_mutation_ids_for_mutator.into_iter() {
359            cleared_mutations.push(mutation_id);
360            tracing::debug!(%mutator_id, %mutation_id, "Clearing mutation");
361            mutator.clear_mutation(&mutation_id);
362
363            if reset_if_active {
364                mutator.reset();
365            }
366        }
367
368        for mutation_id in cleared_mutations {
369            self.send_event(
370                "modality.mutation.clear_communicated",
371                [
372                    ("event.mutator.id", mutator_id_to_attr_val(mutator_id)),
373                    ("event.mutation.id", mutation_id_to_attr_val(mutation_id)),
374                ],
375            )
376            .await;
377        }
378    }
379
380    async fn clear_mutations(&mut self) {
381        let mut cleared_mutations = vec![];
382        for (mutator_id, active_mutation_ids_for_mutator) in self.active_mutations.drain() {
383            let Some(mutator) = self.mutators.get_mut(&mutator_id) else {
384                warn!(
385                    %mutator_id,
386                    "Inconsistent internal state; cannot clear mutations for unregistered mutator'"
387                );
388                continue;
389            };
390
391            for mutation_id in active_mutation_ids_for_mutator.into_iter() {
392                cleared_mutations.push((mutator_id, mutation_id));
393                mutator.clear_mutation(&mutation_id);
394                tracing::debug!(%mutator_id, %mutation_id, "Clearing mutation");
395            }
396
397            mutator.reset();
398        }
399
400        for (mutator_id, mutation_id) in cleared_mutations {
401            self.send_event(
402                "modality.mutation.clear_communicated",
403                [
404                    ("event.mutator.id", mutator_id_to_attr_val(mutator_id)),
405                    ("event.mutation.id", mutation_id_to_attr_val(mutation_id)),
406                ],
407            )
408            .await;
409        }
410    }
411
412    async fn new_mutation(
413        &mut self,
414        mutator_id: MutatorId,
415        mutation_id: crate::mutation_plane::types::MutationId,
416        params: crate::mutation_plane::types::AttrKvs,
417    ) {
418        self.send_event(
419            "modality.mutation.command_communicated",
420            [
421                ("event.mutator.id", mutator_id_to_attr_val(mutator_id)),
422                ("event.mutation.id", mutation_id_to_attr_val(mutation_id)),
423            ],
424        )
425        .await;
426
427        let Some(mutator) = self.mutators.get_mut(&mutator_id) else {
428            tracing::warn!(
429                mutator_id = %mutator_id,
430                "Failed to handle new mutation, mutator not hosted by this client");
431            return;
432        };
433
434        let success = mutator.inject(mutation_id, attr_kvs_to_map(params));
435        self.active_mutations
436            .entry(mutator_id)
437            .or_default()
438            .insert(mutation_id);
439
440        self.send_event(
441            "modality.mutation.injected",
442            [
443                ("event.mutator.id", mutator_id_to_attr_val(mutator_id)),
444                ("event.mutation.id", mutation_id_to_attr_val(mutation_id)),
445                ("event.mutation.success", success.into()),
446            ],
447        )
448        .await;
449    }
450
451    async fn send_event(&mut self, name: &str, attrs: impl IntoIterator<Item = (&str, AttrVal)>) {
452        let Some(i) = self.ingest.as_mut() else {
453            return;
454        };
455
456        let res = i.send_event(name, self.ingest_ordering, attrs).await;
457
458        if let Err(e) = res {
459            warn!(
460                err = &e as &dyn std::error::Error,
461                "Failed to send event to modality"
462            )
463        }
464
465        self.ingest_ordering += 1;
466    }
467}
468
469fn attr_kvs_to_map(
470    params: crate::mutation_plane::types::AttrKvs,
471) -> BTreeMap<String, crate::api::AttrVal> {
472    let mut map = BTreeMap::new();
473    for kv in params.0.into_iter() {
474        map.insert(kv.key, kv.value);
475    }
476    map
477}
478
479fn mutator_announcement(
480    participant_id: ParticipantId,
481    m: &(impl Mutator + ?Sized),
482    mutator_id: &MutatorId,
483) -> RootwardsMessage {
484    let mutator_attrs = m
485        .descriptor()
486        .get_description_attributes()
487        .map(|(k, value)| crate::mutation_plane::types::AttrKv {
488            key: k.to_string(),
489            value,
490        })
491        .collect();
492    RootwardsMessage::MutatorAnnouncement {
493        participant_id,
494        mutator_id: *mutator_id,
495        mutator_attrs: crate::mutation_plane::types::AttrKvs(mutator_attrs),
496    }
497}
498
499const MUTATION_PROTOCOL_PARENT_URL_ENV_VAR: &str = "MUTATION_PROTOCOL_PARENT_URL";
500const MUTATION_PROTOCOL_PARENT_URL_DEFAULT: &str = "modality-mutation://127.0.0.1:14192";
501
502fn mutation_proto_parent_url() -> Result<url::Url, MutationProtocolUrlError> {
503    match std::env::var(MUTATION_PROTOCOL_PARENT_URL_ENV_VAR) {
504        Ok(val) => Ok(Url::parse(&val)?),
505        Err(std::env::VarError::NotUnicode(_)) => {
506            Err(MutationProtocolUrlError::EnvVarSpecifiedMutationProtoParentUrlNonUtf8)
507        }
508        Err(std::env::VarError::NotPresent) => {
509            Ok(Url::parse(MUTATION_PROTOCOL_PARENT_URL_DEFAULT)?)
510        }
511    }
512}
513
514#[derive(Debug, thiserror::Error)]
515pub enum MutationProtocolUrlError {
516    #[error(
517        "The MUTATION_PROTOCOL_PARENT_URL environment variable contained a non-UTF-8-compatible string"
518    )]
519    EnvVarSpecifiedMutationProtoParentUrlNonUtf8,
520
521    #[error("Mutation protocol parent URL error")]
522    MutationProtoParentUrl(#[from] url::ParseError),
523}
524
525fn mutation_id_to_attr_val(mutation_id: MutationId) -> AttrVal {
526    uuid_to_integer_attr_val(mutation_id.as_ref())
527}
528
529pub fn mutator_id_to_attr_val(mutator_id: MutatorId) -> AttrVal {
530    uuid_to_integer_attr_val(mutator_id.as_ref())
531}
532
533fn uuid_to_integer_attr_val(u: &Uuid) -> AttrVal {
534    i128::from_le_bytes(*u.as_bytes()).into()
535}