1#![allow(unused)]
2
3use tracing::{debug, error, info, trace, warn};
4use url::Url;
5use uuid::Uuid;
6
7use crate::{
8 api::{AttrKey, AttrVal, TimelineId},
9 auth_token::AuthToken,
10 mutation_plane::{
11 protocol::{LeafwardsMessage, RootwardsMessage, MUTATION_PROTOCOL_VERSION},
12 types::{MutationId, MutatorId, ParticipantId},
13 },
14 mutation_plane_client::parent_connection::{
15 CommsError, MutationParentClientInitializationError, MutationParentConnection,
16 },
17 mutator_protocol::{
18 actuator::MutatorActuator,
19 descriptor::{
20 owned::{
21 MutatorLayer, MutatorOperation, MutatorStatefulness, OrganizationCustomMetadata,
22 OwnedMutatorDescriptor,
23 },
24 MutatorDescriptor,
25 },
26 mutator::ActuatorDescriptor,
27 },
28};
29use std::{
30 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
31 time::Duration,
32};
33
34pub trait Mutator {
35 fn id(&self) -> MutatorId;
36 fn descriptor(&self) -> OwnedMutatorDescriptor;
37
38 fn inject(&mut self, mutation_id: MutationId, params: BTreeMap<String, AttrVal>) -> bool;
40 fn clear_mutation(&mut self, mutation_id: &MutationId);
41 fn reset(&mut self);
42}
43
44pub struct MutatorHost {
45 participant_id: ParticipantId,
46 pub mutation_conn: MutationParentConnection,
47 mutators: BTreeMap<MutatorId, Box<dyn Mutator + Send>>,
48 active_mutations: HashMap<MutatorId, HashSet<MutationId>>,
49
50 ingest: Option<super::ingest::Client>,
51 ingest_ordering: u128,
52 log_comms: bool,
53 log_inject_and_clear: bool,
54}
55
56impl MutatorHost {
57 pub async fn connect_and_authenticate(
58 endpoint: &Url,
59 allow_insecure_tls: bool,
60 auth_token: AuthToken,
61 mut ingest: Option<super::ingest::Client>,
62 ) -> Result<MutatorHost, MutationParentClientInitializationError> {
63 debug!(%endpoint, %allow_insecure_tls, "Connecting to mutation plane");
64
65 let mut ingest_ordering = 0u128;
66 if let Some(i) = ingest.as_mut() {
67 let tl_id = TimelineId::allocate();
68 i.switch_timeline(tl_id).await.unwrap();
69 i.send_timeline_attrs("MutatorHost", []).await.unwrap();
70 let _ = i
71 .send_event("connecting_to_mutation_plane", ingest_ordering, [])
72 .await;
73 ingest_ordering += 1;
74 }
75
76 let mut mutation_conn =
77 MutationParentConnection::connect(endpoint, allow_insecure_tls).await?;
78
79 let mut_plane_pid = ParticipantId::allocate();
80 debug!(%mut_plane_pid, "Authenticating");
81 if let Some(i) = ingest.as_mut() {
82 let _ = i
83 .send_event(
84 "authenticating",
85 ingest_ordering,
86 [("participant_id", mut_plane_pid.to_string().into())],
87 )
88 .await;
89 ingest_ordering += 1;
90 }
91
92 mutation_conn
93 .write_msg(&RootwardsMessage::ChildAuthAttempt {
94 child_participant_id: mut_plane_pid,
95 version: MUTATION_PROTOCOL_VERSION,
96 token: auth_token.as_ref().to_vec(),
97 })
98 .await;
99
100 debug!("Awaiting authentication response");
101 match mutation_conn.read_msg().await? {
102 LeafwardsMessage::ChildAuthOutcome {
103 child_participant_id,
104 version: _,
105 ok,
106 message,
107 } => {
108 if child_participant_id == mut_plane_pid {
109 if ok {
110 if let Some(i) = ingest.as_mut() {
111 let _ = i.send_event("authenticated", ingest_ordering, []).await;
112 ingest_ordering += 1;
113 }
114 } else {
115 if let Some(i) = ingest.as_mut() {
116 let _ = i
117 .send_event(
118 "authentication_failed",
119 ingest_ordering,
120 message.as_ref().map(|s| ("message", AttrVal::from(s))),
121 )
122 .await;
123 }
124 return Err(
125 MutationParentClientInitializationError::AuthenticationFailed(
126 message.unwrap_or_else(|| "(no message)".to_string()),
127 ),
128 );
129 }
130 } else {
131 if let Some(i) = ingest.as_mut() {
132 let _ = i
133 .send_event(
134 "authentication_failed",
135 ingest_ordering,
136 message.as_ref().map(|s| ("message", AttrVal::from(s))),
137 )
138 .await;
139 }
140 error!("Mutation plane auth outcome received for a different participant");
141 return Err(MutationParentClientInitializationError::AuthWrongParticipant);
142 }
143 }
144 resp => {
145 error!(?resp, "Mutation plane unexpected auth response");
146 return Err(MutationParentClientInitializationError::UnexpectedAuthResponse);
147 }
148 }
149
150 debug!("Authenticated");
151 let mut conn = MutatorHost {
152 participant_id: mut_plane_pid,
153 mutation_conn,
154 mutators: Default::default(),
155 active_mutations: Default::default(),
156
157 ingest,
158 ingest_ordering: 0,
159 log_comms: true,
160 log_inject_and_clear: true,
161 };
162
163 conn.send_event("mutation_plane_connected", []).await;
164 Ok(conn)
165 }
166
167 pub fn disable_mutation_communicated_logging(&mut self) {
169 self.log_comms = false;
170 }
171
172 pub fn disable_mutation_inject_and_clear_logging(&mut self) {
177 self.log_inject_and_clear = false;
178 }
179
180 pub async fn register_mutator(
181 &mut self,
182 mutator: Box<dyn Mutator + Send>,
183 ) -> Result<(), CommsError> {
184 let mutator_id = mutator.id();
185 let ann = mutator_announcement(self.participant_id, mutator.as_ref(), &mutator_id);
186 self.mutators.insert(mutator.id(), mutator);
187 self.mutation_conn.write_msg(&ann).await?;
188
189 self.send_event(
190 "modality.mutator.announced",
191 [("event.mutator.id", mutator_id_to_attr_val(mutator_id))],
192 )
193 .await;
194
195 Ok(())
196 }
197
198 pub async fn message_loop(&mut self) -> Result<(), CommsError> {
199 loop {
200 let msg = self.mutation_conn.read_msg().await?;
201 self.handle_message(msg).await;
202 }
203 }
204
205 pub async fn handle_message(&mut self, msg: LeafwardsMessage) {
206 trace!(?msg, "handle_message");
207 match msg {
208 LeafwardsMessage::RequestForMutatorAnnouncements {} => {
209 self.announce_all_mutators().await;
210 }
211
212 LeafwardsMessage::NewMutation {
213 mutator_id,
214 mutation_id,
215 maybe_trigger_mask: _,
216 params,
217 } => {
218 self.new_mutation(mutator_id, mutation_id, params).await;
219 }
220
221 LeafwardsMessage::ClearSingleMutation {
222 mutator_id,
223 mutation_id,
224 reset_if_active,
225 } => {
226 self.clear_single_mutation(mutator_id, mutation_id, reset_if_active)
227 .await;
228 }
229
230 LeafwardsMessage::ClearMutationsForMutator {
231 mutator_id,
232 reset_if_active,
233 } => {
234 self.clear_mutations_for_mutator(mutator_id, reset_if_active)
235 .await;
236 }
237
238 LeafwardsMessage::ClearMutations {} => {
239 self.clear_mutations().await;
240 }
241
242 LeafwardsMessage::UpdateTriggerState {
243 mutator_id: _,
244 mutation_id: _,
245 maybe_trigger_crdt: _,
246 } => {
247 }
249
250 _ => {
251 warn!("Unexpected message");
252 self.send_event("unexpected_message", []).await;
253 }
254 }
255 }
256
257 async fn announce_all_mutators(&mut self) {
258 let mut announces = Vec::with_capacity(self.mutators.len());
260 let mut mutator_ids = Vec::with_capacity(self.mutators.len());
261 for (mutator_id, mutator) in self.mutators.iter() {
262 let ann = mutator_announcement(self.participant_id, mutator.as_ref(), mutator_id);
263 announces.push(ann);
264 mutator_ids.push(*mutator_id);
265 }
266
267 for ann in announces.into_iter() {
268 if let Err(e) = self.mutation_conn.write_msg(&ann).await {
269 error!(
270 err = &e as &dyn std::error::Error,
271 "Failed to announce mutator; aborting batch announce"
272 );
273 return;
275 }
276 }
277
278 for mutator_id in mutator_ids.into_iter() {
279 self.send_event(
280 "modality.mutator.announced",
281 [("event.mutator.id", mutator_id_to_attr_val(mutator_id))],
282 )
283 .await;
284 }
285 }
286
287 async fn clear_single_mutation(
288 &mut self,
289 mutator_id: MutatorId,
290 mutation_id: MutationId,
291 reset_if_active: bool,
292 ) {
293 self.send_event(
294 "modality.mutation.clear_communicated",
295 [
296 ("event.mutator.id", mutator_id_to_attr_val(mutator_id)),
297 ("event.mutation.id", mutation_id_to_attr_val(mutation_id)),
298 ("event.mutation.success", true.into()),
299 ],
300 )
301 .await;
302
303 let Some(mutator) = self.mutators.get_mut(&mutator_id) else {
304 warn!(
305 %mutator_id,
306 %mutation_id,
307 "Cannot clear mutation, mutator is not hosted by this client"
308 );
309 return;
310 };
311
312 let Some(active_mutation_ids_for_mutator) = self.active_mutations.get_mut(&mutator_id)
313 else {
314 warn!(
315 %mutator_id,
316 %mutation_id,
317 "Cannot clear mutation, no active mutations for mutator"
318 );
319 return;
320 };
321
322 if !active_mutation_ids_for_mutator.remove(&mutation_id) {
323 warn!(
324 %mutator_id,
325 %mutation_id,
326 "Cannot clear mutation, mutation not active"
327 );
328 return;
329 }
330
331 tracing::debug!(%mutator_id, %mutation_id, "Clearing mutation");
332
333 mutator.clear_mutation(&mutation_id);
334 if reset_if_active {
335 mutator.reset();
336 }
337 }
338
339 async fn clear_mutations_for_mutator(&mut self, mutator_id: MutatorId, reset_if_active: bool) {
340 let Some(mutator) = self.mutators.get_mut(&mutator_id) else {
341 warn!(
342 %mutator_id,
343 "Cannot clear mutations, mutator is not hosted by this client"
344 );
345 return;
346 };
347
348 let Some(active_mutation_ids_for_mutator) = self.active_mutations.remove(&mutator_id)
349 else {
350 warn!(
351 %mutator_id,
352 "Cannot clear mutations, no active mutations for mutator"
353 );
354 return;
355 };
356
357 let mut cleared_mutations = vec![];
358 for mutation_id in active_mutation_ids_for_mutator.into_iter() {
359 cleared_mutations.push(mutation_id);
360 tracing::debug!(%mutator_id, %mutation_id, "Clearing mutation");
361 mutator.clear_mutation(&mutation_id);
362
363 if reset_if_active {
364 mutator.reset();
365 }
366 }
367
368 for mutation_id in cleared_mutations {
369 self.send_event(
370 "modality.mutation.clear_communicated",
371 [
372 ("event.mutator.id", mutator_id_to_attr_val(mutator_id)),
373 ("event.mutation.id", mutation_id_to_attr_val(mutation_id)),
374 ],
375 )
376 .await;
377 }
378 }
379
380 async fn clear_mutations(&mut self) {
381 let mut cleared_mutations = vec![];
382 for (mutator_id, active_mutation_ids_for_mutator) in self.active_mutations.drain() {
383 let Some(mutator) = self.mutators.get_mut(&mutator_id) else {
384 warn!(
385 %mutator_id,
386 "Inconsistent internal state; cannot clear mutations for unregistered mutator'"
387 );
388 continue;
389 };
390
391 for mutation_id in active_mutation_ids_for_mutator.into_iter() {
392 cleared_mutations.push((mutator_id, mutation_id));
393 mutator.clear_mutation(&mutation_id);
394 tracing::debug!(%mutator_id, %mutation_id, "Clearing mutation");
395 }
396
397 mutator.reset();
398 }
399
400 for (mutator_id, mutation_id) in cleared_mutations {
401 self.send_event(
402 "modality.mutation.clear_communicated",
403 [
404 ("event.mutator.id", mutator_id_to_attr_val(mutator_id)),
405 ("event.mutation.id", mutation_id_to_attr_val(mutation_id)),
406 ],
407 )
408 .await;
409 }
410 }
411
412 async fn new_mutation(
413 &mut self,
414 mutator_id: MutatorId,
415 mutation_id: crate::mutation_plane::types::MutationId,
416 params: crate::mutation_plane::types::AttrKvs,
417 ) {
418 self.send_event(
419 "modality.mutation.command_communicated",
420 [
421 ("event.mutator.id", mutator_id_to_attr_val(mutator_id)),
422 ("event.mutation.id", mutation_id_to_attr_val(mutation_id)),
423 ],
424 )
425 .await;
426
427 let Some(mutator) = self.mutators.get_mut(&mutator_id) else {
428 tracing::warn!(
429 mutator_id = %mutator_id,
430 "Failed to handle new mutation, mutator not hosted by this client");
431 return;
432 };
433
434 let success = mutator.inject(mutation_id, attr_kvs_to_map(params));
435 self.active_mutations
436 .entry(mutator_id)
437 .or_default()
438 .insert(mutation_id);
439
440 self.send_event(
441 "modality.mutation.injected",
442 [
443 ("event.mutator.id", mutator_id_to_attr_val(mutator_id)),
444 ("event.mutation.id", mutation_id_to_attr_val(mutation_id)),
445 ("event.mutation.success", success.into()),
446 ],
447 )
448 .await;
449 }
450
451 async fn send_event(&mut self, name: &str, attrs: impl IntoIterator<Item = (&str, AttrVal)>) {
452 let Some(i) = self.ingest.as_mut() else {
453 return;
454 };
455
456 let res = i.send_event(name, self.ingest_ordering, attrs).await;
457
458 if let Err(e) = res {
459 warn!(
460 err = &e as &dyn std::error::Error,
461 "Failed to send event to modality"
462 )
463 }
464
465 self.ingest_ordering += 1;
466 }
467}
468
469fn attr_kvs_to_map(
470 params: crate::mutation_plane::types::AttrKvs,
471) -> BTreeMap<String, crate::api::AttrVal> {
472 let mut map = BTreeMap::new();
473 for kv in params.0.into_iter() {
474 map.insert(kv.key, kv.value);
475 }
476 map
477}
478
479fn mutator_announcement(
480 participant_id: ParticipantId,
481 m: &(impl Mutator + ?Sized),
482 mutator_id: &MutatorId,
483) -> RootwardsMessage {
484 let mutator_attrs = m
485 .descriptor()
486 .get_description_attributes()
487 .map(|(k, value)| crate::mutation_plane::types::AttrKv {
488 key: k.to_string(),
489 value,
490 })
491 .collect();
492 RootwardsMessage::MutatorAnnouncement {
493 participant_id,
494 mutator_id: *mutator_id,
495 mutator_attrs: crate::mutation_plane::types::AttrKvs(mutator_attrs),
496 }
497}
498
499const MUTATION_PROTOCOL_PARENT_URL_ENV_VAR: &str = "MUTATION_PROTOCOL_PARENT_URL";
500const MUTATION_PROTOCOL_PARENT_URL_DEFAULT: &str = "modality-mutation://127.0.0.1:14192";
501
502fn mutation_proto_parent_url() -> Result<url::Url, MutationProtocolUrlError> {
503 match std::env::var(MUTATION_PROTOCOL_PARENT_URL_ENV_VAR) {
504 Ok(val) => Ok(Url::parse(&val)?),
505 Err(std::env::VarError::NotUnicode(_)) => {
506 Err(MutationProtocolUrlError::EnvVarSpecifiedMutationProtoParentUrlNonUtf8)
507 }
508 Err(std::env::VarError::NotPresent) => {
509 Ok(Url::parse(MUTATION_PROTOCOL_PARENT_URL_DEFAULT)?)
510 }
511 }
512}
513
514#[derive(Debug, thiserror::Error)]
515pub enum MutationProtocolUrlError {
516 #[error(
517 "The MUTATION_PROTOCOL_PARENT_URL environment variable contained a non-UTF-8-compatible string"
518 )]
519 EnvVarSpecifiedMutationProtoParentUrlNonUtf8,
520
521 #[error("Mutation protocol parent URL error")]
522 MutationProtoParentUrl(#[from] url::ParseError),
523}
524
525fn mutation_id_to_attr_val(mutation_id: MutationId) -> AttrVal {
526 uuid_to_integer_attr_val(mutation_id.as_ref())
527}
528
529pub fn mutator_id_to_attr_val(mutator_id: MutatorId) -> AttrVal {
530 uuid_to_integer_attr_val(mutator_id.as_ref())
531}
532
533fn uuid_to_integer_attr_val(u: &Uuid) -> AttrVal {
534 i128::from_le_bytes(*u.as_bytes()).into()
535}