modality_mutation_plane_client/
child_connection.rs

1#![cfg_attr(not(unix), allow(unused))]
2use minicbor_io::{AsyncReader, AsyncWriter};
3use modality_mutation_plane::modality_mutator_protocol::attrs::{AttrKey, AttrVal};
4use modality_mutation_plane::protocol::{
5    LeafwardsMessage, RootwardsMessage, MUTATION_PROTOCOL_VERSION,
6};
7use modality_mutation_plane::types::{ParticipantId, TriggerCRDT};
8use std::collections::BTreeMap;
9use tokio::net::TcpStream;
10use tokio::sync::{broadcast, mpsc, oneshot};
11use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
12
13pub struct AuthReq {
14    pub is_direct: bool,
15    pub token: Vec<u8>,
16    pub participant_id: ParticipantId,
17    pub response_tx: oneshot::Sender<AuthResponse>,
18}
19
20#[derive(Debug)]
21pub enum AuthResponse {
22    DirectAuthOk {
23        connection_id: ChildConnectionId,
24        message: Option<String>,
25        rootwards_tx: mpsc::Sender<Rootwards>,
26        leafwards_rx: mpsc::Receiver<LeafwardsMessage>,
27    },
28    DelegatingAuthOk {
29        message: Option<String>,
30    },
31    NotAuth {
32        message: Option<String>,
33    },
34}
35/// Opaque, internal-only id for child connections
36#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
37pub struct ChildConnectionId(pub uuid::Uuid);
38
39impl std::fmt::Display for ChildConnectionId {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        uuid::fmt::Hyphenated::from_uuid(self.0).fmt(f)
42    }
43}
44
45/// Messages from an already-authenticated child connection.
46#[derive(Debug)]
47pub enum Rootwards {
48    // Probably need to send ChildConnectionId along with all Rootwards messages
49    MutatorAnnouncement {
50        connection_id: ChildConnectionId,
51        /// Id of the participant in charge of managing the mutator
52        participant_id: ParticipantId,
53        mutator_id: modality_mutation_plane::types::MutatorId,
54        mutator_attrs: BTreeMap<AttrKey, AttrVal>,
55    },
56    MutatorRetirement {
57        connection_id: ChildConnectionId,
58        /// Id of the participant in charge of managing the mutator
59        participant_id: ParticipantId,
60        mutator_id: modality_mutation_plane::types::MutatorId,
61    },
62    UpdateTriggerState {
63        connection_id: ChildConnectionId,
64
65        mutator_id: modality_mutation_plane::types::MutatorId,
66        mutation_id: modality_mutation_plane::types::MutationId,
67        /// Interpret "None" as "clear your trigger state for this mutation, you don't need to
68        /// track it (anymore)"
69        maybe_trigger_crdt: Option<TriggerCRDT>,
70    },
71}
72impl Rootwards {
73    pub fn connection_id(&self) -> ChildConnectionId {
74        match self {
75            Rootwards::MutatorAnnouncement { connection_id, .. } => *connection_id,
76            Rootwards::MutatorRetirement { connection_id, .. } => *connection_id,
77            Rootwards::UpdateTriggerState { connection_id, .. } => *connection_id,
78        }
79    }
80}
81
82pub async fn mutation_protocol_child_tcp_connection(
83    mut stream: TcpStream,
84    shutdown: broadcast::Receiver<()>,
85    auth_tx: mpsc::Sender<AuthReq>,
86) -> (
87    Option<ChildConnectionId>,
88    Result<(), Box<dyn std::error::Error>>,
89) {
90    let (reader, writer) = stream.split();
91    let msg_reader = AsyncReader::new(reader.compat());
92    let msg_writer = AsyncWriter::new(writer.compat_write());
93    mutation_protocol_child_connection(msg_reader, msg_writer, shutdown, auth_tx).await
94}
95
96/// This code LOOKS the same as tcp_connection, but none of the TYPES are common. So we get to
97/// duplicate it.
98#[cfg(unix)]
99pub async fn mutation_protocol_child_uds_connection(
100    mut stream: tokio::net::UnixStream,
101    shutdown: broadcast::Receiver<()>,
102    auth_tx: mpsc::Sender<AuthReq>,
103) -> (
104    Option<ChildConnectionId>,
105    Result<(), Box<dyn std::error::Error>>,
106) {
107    let (reader, writer) = stream.split();
108    let msg_reader = AsyncReader::new(reader.compat());
109    let msg_writer = AsyncWriter::new(writer.compat_write());
110    mutation_protocol_child_connection(msg_reader, msg_writer, shutdown, auth_tx).await
111}
112
113pub async fn mutation_protocol_child_connection<R, W>(
114    mut msg_reader: AsyncReader<R>,
115    mut msg_writer: AsyncWriter<W>,
116    mut shutdown_rx: broadcast::Receiver<()>,
117    auth_tx: mpsc::Sender<AuthReq>,
118) -> (
119    Option<ChildConnectionId>,
120    Result<(), Box<dyn std::error::Error>>,
121)
122where
123    R: futures::AsyncRead + Unpin,
124    W: futures::AsyncWrite + Unpin,
125{
126    let mut unauth_state = UnauthenticatedConnectionState { auth_tx };
127    let mut ready_state = loop {
128        tokio::select! {
129            msg = msg_reader.read::<RootwardsMessage>() => {
130                let msg = match msg {
131                    Ok(Some(msg)) => msg,
132                    Ok(None) => return (None, Ok(())),
133                    Err(minicbor_io::Error::Decode(e)) => {
134                        tracing::error!(
135                            error = &e as &dyn std::error::Error,
136                            "Dropping invalid message during unauth state"
137                        );
138                        continue;
139                    }
140                    Err(e) => return (None, Err(e.into())),
141                };
142
143                match unauth_state.handle_rootwards_message(msg).await {
144                    UnauthenticatedMessageOutcome::Proceed { state, reply } => {
145                        if let Err(e) = msg_writer.write(reply).await {
146                            return (Some(state.connection_id), Err(e.into()));
147                        }
148                        break state;
149                    }
150                    UnauthenticatedMessageOutcome::StayPut { state, reply } => {
151                        if let Err(e) = msg_writer.write(reply).await {
152                            return (None, Err(e.into()));
153                        }
154                        unauth_state = state;
155                    }
156                }
157            },
158            _ = shutdown_rx.recv() => {
159                tracing::info!("Mutation protocol child connection received shutdown request while still unauthenticated.");
160                return (None, Ok(()))
161            }
162        }
163    };
164    tracing::trace!("Mutation protocol client authenticated");
165
166    loop {
167        tokio::select! {
168            // Pull from the channel coming from parent
169            maybe_leafwards = ready_state.leafwards_rx.recv() => {
170                match maybe_leafwards {
171                    Some(leafwards) => {
172                        if let Err(e) = msg_writer.write(leafwards).await {
173                            return (Some(ready_state.connection_id), Err(e.into()));
174                        }
175                    },
176                    None => {
177                        tracing::warn!("Internal leafwards channel closed early unexpectedly for mutation protocol child connection.");
178                        return (Some(ready_state.connection_id), Ok(()));
179                    }
180                }
181            },
182            // Pull from the network coming from child
183            maybe_rootwards_result = msg_reader.read::<RootwardsMessage>() => {
184                let msg: RootwardsMessage = match maybe_rootwards_result {
185                    Ok(Some(msg)) => msg,
186                    Ok(None) => return (Some(ready_state.connection_id), Ok(())),
187                    Err(minicbor_io::Error::Decode(e)) => {
188                        tracing::error!(error = &e as &dyn std::error::Error, "Dropping invalid message during ready state.");
189                        continue;
190                    }
191                    Err(e) => return (Some(ready_state.connection_id), Err(e.into())),
192                };
193                let ReadyMessageOutcome {
194                    reply_to_child, send_to_root
195                } = ready_state.handle_rootwards_message(msg).await;
196                if let Some(reply) = reply_to_child {
197                    if let Err(e) = msg_writer.write(reply).await {
198                        return (Some(ready_state.connection_id), Err(e.into()));
199                    }
200                }
201                if let Some(rootwards) = send_to_root {
202                    if let Err(e) = ready_state.rootwards_tx.send(rootwards).await {
203                        tracing::error!(error = &e as &dyn std::error::Error, "Could not send rootwards message from child connection over internal channel.");
204                    }
205                }
206            },
207            _ = shutdown_rx.recv() => {
208                tracing::info!("Mutation protocol child connection received shutdown request while in the ready state");
209                return (Some(ready_state.connection_id), Ok(()))
210            }
211        }
212    }
213}
214/// After handling a message, a connection can stay unauthenticated, or can move forward to 'ready'.
215enum UnauthenticatedMessageOutcome {
216    Proceed {
217        state: ReadyConnectionState,
218        reply: LeafwardsMessage,
219    },
220    StayPut {
221        state: UnauthenticatedConnectionState,
222        reply: LeafwardsMessage,
223    },
224}
225struct UnauthenticatedConnectionState {
226    auth_tx: tokio::sync::mpsc::Sender<AuthReq>,
227}
228
229impl UnauthenticatedConnectionState {
230    async fn handle_rootwards_message(
231        self,
232        msg: RootwardsMessage,
233    ) -> UnauthenticatedMessageOutcome {
234        match msg {
235            RootwardsMessage::ChildAuthAttempt {
236                child_participant_id,
237                version,
238                token,
239            } => {
240                tracing::debug!(version = version, participant_id = %child_participant_id, "Auth attempt from unauthorized child connection");
241                let (response_tx, response_rx) = tokio::sync::oneshot::channel();
242                if self
243                    .auth_tx
244                    .send(AuthReq {
245                        // We are trying to auth the child participant
246                        // sitting at the top of the participant tree directly
247                        // on the other side of this network connection
248                        is_direct: true,
249                        token,
250                        participant_id: child_participant_id,
251                        response_tx,
252                    })
253                    .await
254                    .is_err()
255                {
256                    UnauthenticatedMessageOutcome::StayPut {
257                        state: self,
258                        reply: LeafwardsMessage::ChildAuthOutcome {
259                            child_participant_id,
260                            version: MUTATION_PROTOCOL_VERSION,
261                            ok: false,
262                            message: Some(
263                                "Could not send auth request over internal channel".to_owned(),
264                            ),
265                        },
266                    }
267                } else {
268                    match response_rx.await {
269                        Ok(resp) => {
270                            match resp {
271                                AuthResponse::DirectAuthOk { connection_id, message, rootwards_tx, leafwards_rx } => {
272                                    UnauthenticatedMessageOutcome::Proceed {
273                                        state: ReadyConnectionState {
274                                            connection_id,
275                                            auth_tx: self.auth_tx,
276                                            leafwards_rx,
277                                            rootwards_tx
278                                        },
279                                        reply: LeafwardsMessage::ChildAuthOutcome {
280                                            child_participant_id,
281                                            version: MUTATION_PROTOCOL_VERSION,
282                                            ok: true,
283                                            message
284                                        }
285                                    }
286                                }
287                                AuthResponse::DelegatingAuthOk { message } => {
288                                    UnauthenticatedMessageOutcome::StayPut {
289                                        state: self,
290                                        reply: LeafwardsMessage::ChildAuthOutcome {
291                                            child_participant_id,
292                                            version: MUTATION_PROTOCOL_VERSION,
293                                            ok: true,
294                                            message
295                                        }
296                                    }
297                                }
298                                AuthResponse::NotAuth { message } => {
299                                    UnauthenticatedMessageOutcome::StayPut { state: self, reply: LeafwardsMessage::ChildAuthOutcome {
300                                        child_participant_id,
301                                        version: MUTATION_PROTOCOL_VERSION,
302                                        ok: false,
303                                        message
304                                    } }
305                                }
306                            }
307                        },
308                        Err(_recv_err) => {
309                            UnauthenticatedMessageOutcome::StayPut { state: self, reply: LeafwardsMessage::ChildAuthOutcome {
310                                child_participant_id,
311                                version: MUTATION_PROTOCOL_VERSION,
312                                ok: false,
313                                message:Some("Mutation plane child connection could not receive auth request over internal channel.".to_owned())
314                            } }
315                        }
316                    }
317                }
318            }
319            _ => UnauthenticatedMessageOutcome::StayPut {
320                state: self,
321                reply: LeafwardsMessage::UnauthenticatedResponse {},
322            },
323        }
324    }
325}
326
327struct ReadyConnectionState {
328    connection_id: ChildConnectionId,
329    auth_tx: tokio::sync::mpsc::Sender<AuthReq>,
330    leafwards_rx: tokio::sync::mpsc::Receiver<LeafwardsMessage>,
331    rootwards_tx: tokio::sync::mpsc::Sender<Rootwards>,
332}
333
334struct ReadyMessageOutcome {
335    reply_to_child: Option<LeafwardsMessage>,
336    send_to_root: Option<Rootwards>, // Maybe upwards things? Maybe deal with it all inline?
337}
338
339impl ReadyConnectionState {
340    async fn handle_rootwards_message(&mut self, msg: RootwardsMessage) -> ReadyMessageOutcome {
341        match msg {
342            RootwardsMessage::ChildAuthAttempt {
343                child_participant_id,
344                version,
345                token,
346            } => {
347                tracing::debug!(version = version, participant_id = %child_participant_id, "Auth attempt from already-authorized child connection");
348                let (response_tx, response_rx) = tokio::sync::oneshot::channel();
349                if self
350                    .auth_tx
351                    .send(AuthReq {
352                        // We are passing along an auth request from a further descendant in the
353                        // participant tree.
354                        is_direct: false,
355                        token,
356                        participant_id: child_participant_id,
357                        response_tx,
358                    })
359                    .await
360                    .is_err()
361                {
362                    ReadyMessageOutcome {
363                        reply_to_child: Some(LeafwardsMessage::ChildAuthOutcome {
364                            child_participant_id,
365                            version: MUTATION_PROTOCOL_VERSION,
366                            ok: false,
367                            message: Some(
368                                "Could not send auth request over internal channel".to_owned(),
369                            ),
370                        }),
371                        send_to_root: None,
372                    }
373                } else {
374                    match response_rx.await {
375                        Ok(resp) => {
376                            match resp {
377                                AuthResponse::DirectAuthOk { connection_id: _, message, rootwards_tx: _, leafwards_rx : _} => {
378                                    ReadyMessageOutcome {
379                                        reply_to_child: Some(LeafwardsMessage::ChildAuthOutcome {
380                                            child_participant_id,
381                                            version: MUTATION_PROTOCOL_VERSION,
382                                            ok: true,
383                                            message
384                                        }),
385                                        send_to_root: None
386                                    }
387                                }
388                                AuthResponse::DelegatingAuthOk { message } => {
389                                    ReadyMessageOutcome {
390                                        reply_to_child: Some(LeafwardsMessage::ChildAuthOutcome {
391                                            child_participant_id,
392                                            version: MUTATION_PROTOCOL_VERSION,
393                                            ok: true,
394                                            message
395                                        }),
396                                        send_to_root: None
397                                    }
398                                }
399                                AuthResponse::NotAuth { message } => {
400                                    ReadyMessageOutcome { reply_to_child: Some(LeafwardsMessage::ChildAuthOutcome {
401                                        child_participant_id,
402                                        version: MUTATION_PROTOCOL_VERSION,
403                                        ok: false,
404                                        message
405                                    }),
406                                        send_to_root: None
407                                    }
408                                }
409                            }
410                        },
411                        Err(_recv_err) => {
412                            ReadyMessageOutcome { reply_to_child: Some(LeafwardsMessage::ChildAuthOutcome {
413                                child_participant_id,
414                                version: MUTATION_PROTOCOL_VERSION,
415                                ok: false,
416                                message:Some("Mutation plane child connection could not receive auth request over internal channel.".to_owned())
417                            }),
418                                send_to_root: None
419                            }
420                        }
421                    }
422                }
423            }
424            RootwardsMessage::MutatorAnnouncement {
425                participant_id,
426                mutator_id,
427                mutator_attrs,
428            } => ReadyMessageOutcome {
429                reply_to_child: None,
430                send_to_root: Some(Rootwards::MutatorAnnouncement {
431                    connection_id: self.connection_id,
432                    participant_id,
433                    mutator_id,
434                    mutator_attrs: mutator_attrs
435                        .0
436                        .into_iter()
437                        .map(|kv| (AttrKey::from(kv.key), kv.value))
438                        .collect(),
439                }),
440            },
441            RootwardsMessage::MutatorRetirement {
442                participant_id,
443                mutator_id,
444            } => ReadyMessageOutcome {
445                reply_to_child: None,
446                send_to_root: Some(Rootwards::MutatorRetirement {
447                    connection_id: self.connection_id,
448                    participant_id,
449                    mutator_id,
450                }),
451            },
452            RootwardsMessage::UpdateTriggerState {
453                mutator_id,
454                mutation_id,
455                maybe_trigger_crdt,
456            } => ReadyMessageOutcome {
457                reply_to_child: None,
458                send_to_root: Some(Rootwards::UpdateTriggerState {
459                    connection_id: self.connection_id,
460                    mutator_id,
461                    mutation_id,
462                    maybe_trigger_crdt,
463                }),
464            },
465        }
466    }
467}