1#![cfg_attr(not(unix), allow(unused))]
2use minicbor_io::{AsyncReader, AsyncWriter};
3use modality_mutation_plane::modality_mutator_protocol::attrs::{AttrKey, AttrVal};
4use modality_mutation_plane::protocol::{
5 LeafwardsMessage, RootwardsMessage, MUTATION_PROTOCOL_VERSION,
6};
7use modality_mutation_plane::types::{ParticipantId, TriggerCRDT};
8use std::collections::BTreeMap;
9use tokio::net::TcpStream;
10use tokio::sync::{broadcast, mpsc, oneshot};
11use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
12
13pub struct AuthReq {
14 pub is_direct: bool,
15 pub token: Vec<u8>,
16 pub participant_id: ParticipantId,
17 pub response_tx: oneshot::Sender<AuthResponse>,
18}
19
20#[derive(Debug)]
21pub enum AuthResponse {
22 DirectAuthOk {
23 connection_id: ChildConnectionId,
24 message: Option<String>,
25 rootwards_tx: mpsc::Sender<Rootwards>,
26 leafwards_rx: mpsc::Receiver<LeafwardsMessage>,
27 },
28 DelegatingAuthOk {
29 message: Option<String>,
30 },
31 NotAuth {
32 message: Option<String>,
33 },
34}
35#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
37pub struct ChildConnectionId(pub uuid::Uuid);
38
39impl std::fmt::Display for ChildConnectionId {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 uuid::fmt::Hyphenated::from_uuid(self.0).fmt(f)
42 }
43}
44
45#[derive(Debug)]
47pub enum Rootwards {
48 MutatorAnnouncement {
50 connection_id: ChildConnectionId,
51 participant_id: ParticipantId,
53 mutator_id: modality_mutation_plane::types::MutatorId,
54 mutator_attrs: BTreeMap<AttrKey, AttrVal>,
55 },
56 MutatorRetirement {
57 connection_id: ChildConnectionId,
58 participant_id: ParticipantId,
60 mutator_id: modality_mutation_plane::types::MutatorId,
61 },
62 UpdateTriggerState {
63 connection_id: ChildConnectionId,
64
65 mutator_id: modality_mutation_plane::types::MutatorId,
66 mutation_id: modality_mutation_plane::types::MutationId,
67 maybe_trigger_crdt: Option<TriggerCRDT>,
70 },
71}
72impl Rootwards {
73 pub fn connection_id(&self) -> ChildConnectionId {
74 match self {
75 Rootwards::MutatorAnnouncement { connection_id, .. } => *connection_id,
76 Rootwards::MutatorRetirement { connection_id, .. } => *connection_id,
77 Rootwards::UpdateTriggerState { connection_id, .. } => *connection_id,
78 }
79 }
80}
81
82pub async fn mutation_protocol_child_tcp_connection(
83 mut stream: TcpStream,
84 shutdown: broadcast::Receiver<()>,
85 auth_tx: mpsc::Sender<AuthReq>,
86) -> (
87 Option<ChildConnectionId>,
88 Result<(), Box<dyn std::error::Error>>,
89) {
90 let (reader, writer) = stream.split();
91 let msg_reader = AsyncReader::new(reader.compat());
92 let msg_writer = AsyncWriter::new(writer.compat_write());
93 mutation_protocol_child_connection(msg_reader, msg_writer, shutdown, auth_tx).await
94}
95
96#[cfg(unix)]
99pub async fn mutation_protocol_child_uds_connection(
100 mut stream: tokio::net::UnixStream,
101 shutdown: broadcast::Receiver<()>,
102 auth_tx: mpsc::Sender<AuthReq>,
103) -> (
104 Option<ChildConnectionId>,
105 Result<(), Box<dyn std::error::Error>>,
106) {
107 let (reader, writer) = stream.split();
108 let msg_reader = AsyncReader::new(reader.compat());
109 let msg_writer = AsyncWriter::new(writer.compat_write());
110 mutation_protocol_child_connection(msg_reader, msg_writer, shutdown, auth_tx).await
111}
112
113pub async fn mutation_protocol_child_connection<R, W>(
114 mut msg_reader: AsyncReader<R>,
115 mut msg_writer: AsyncWriter<W>,
116 mut shutdown_rx: broadcast::Receiver<()>,
117 auth_tx: mpsc::Sender<AuthReq>,
118) -> (
119 Option<ChildConnectionId>,
120 Result<(), Box<dyn std::error::Error>>,
121)
122where
123 R: futures::AsyncRead + Unpin,
124 W: futures::AsyncWrite + Unpin,
125{
126 let mut unauth_state = UnauthenticatedConnectionState { auth_tx };
127 let mut ready_state = loop {
128 tokio::select! {
129 msg = msg_reader.read::<RootwardsMessage>() => {
130 let msg = match msg {
131 Ok(Some(msg)) => msg,
132 Ok(None) => return (None, Ok(())),
133 Err(minicbor_io::Error::Decode(e)) => {
134 tracing::error!(
135 error = &e as &dyn std::error::Error,
136 "Dropping invalid message during unauth state"
137 );
138 continue;
139 }
140 Err(e) => return (None, Err(e.into())),
141 };
142
143 match unauth_state.handle_rootwards_message(msg).await {
144 UnauthenticatedMessageOutcome::Proceed { state, reply } => {
145 if let Err(e) = msg_writer.write(reply).await {
146 return (Some(state.connection_id), Err(e.into()));
147 }
148 break state;
149 }
150 UnauthenticatedMessageOutcome::StayPut { state, reply } => {
151 if let Err(e) = msg_writer.write(reply).await {
152 return (None, Err(e.into()));
153 }
154 unauth_state = state;
155 }
156 }
157 },
158 _ = shutdown_rx.recv() => {
159 tracing::info!("Mutation protocol child connection received shutdown request while still unauthenticated.");
160 return (None, Ok(()))
161 }
162 }
163 };
164 tracing::trace!("Mutation protocol client authenticated");
165
166 loop {
167 tokio::select! {
168 maybe_leafwards = ready_state.leafwards_rx.recv() => {
170 match maybe_leafwards {
171 Some(leafwards) => {
172 if let Err(e) = msg_writer.write(leafwards).await {
173 return (Some(ready_state.connection_id), Err(e.into()));
174 }
175 },
176 None => {
177 tracing::warn!("Internal leafwards channel closed early unexpectedly for mutation protocol child connection.");
178 return (Some(ready_state.connection_id), Ok(()));
179 }
180 }
181 },
182 maybe_rootwards_result = msg_reader.read::<RootwardsMessage>() => {
184 let msg: RootwardsMessage = match maybe_rootwards_result {
185 Ok(Some(msg)) => msg,
186 Ok(None) => return (Some(ready_state.connection_id), Ok(())),
187 Err(minicbor_io::Error::Decode(e)) => {
188 tracing::error!(error = &e as &dyn std::error::Error, "Dropping invalid message during ready state.");
189 continue;
190 }
191 Err(e) => return (Some(ready_state.connection_id), Err(e.into())),
192 };
193 let ReadyMessageOutcome {
194 reply_to_child, send_to_root
195 } = ready_state.handle_rootwards_message(msg).await;
196 if let Some(reply) = reply_to_child {
197 if let Err(e) = msg_writer.write(reply).await {
198 return (Some(ready_state.connection_id), Err(e.into()));
199 }
200 }
201 if let Some(rootwards) = send_to_root {
202 if let Err(e) = ready_state.rootwards_tx.send(rootwards).await {
203 tracing::error!(error = &e as &dyn std::error::Error, "Could not send rootwards message from child connection over internal channel.");
204 }
205 }
206 },
207 _ = shutdown_rx.recv() => {
208 tracing::info!("Mutation protocol child connection received shutdown request while in the ready state");
209 return (Some(ready_state.connection_id), Ok(()))
210 }
211 }
212 }
213}
214enum UnauthenticatedMessageOutcome {
216 Proceed {
217 state: ReadyConnectionState,
218 reply: LeafwardsMessage,
219 },
220 StayPut {
221 state: UnauthenticatedConnectionState,
222 reply: LeafwardsMessage,
223 },
224}
225struct UnauthenticatedConnectionState {
226 auth_tx: tokio::sync::mpsc::Sender<AuthReq>,
227}
228
229impl UnauthenticatedConnectionState {
230 async fn handle_rootwards_message(
231 self,
232 msg: RootwardsMessage,
233 ) -> UnauthenticatedMessageOutcome {
234 match msg {
235 RootwardsMessage::ChildAuthAttempt {
236 child_participant_id,
237 version,
238 token,
239 } => {
240 tracing::debug!(version = version, participant_id = %child_participant_id, "Auth attempt from unauthorized child connection");
241 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
242 if self
243 .auth_tx
244 .send(AuthReq {
245 is_direct: true,
249 token,
250 participant_id: child_participant_id,
251 response_tx,
252 })
253 .await
254 .is_err()
255 {
256 UnauthenticatedMessageOutcome::StayPut {
257 state: self,
258 reply: LeafwardsMessage::ChildAuthOutcome {
259 child_participant_id,
260 version: MUTATION_PROTOCOL_VERSION,
261 ok: false,
262 message: Some(
263 "Could not send auth request over internal channel".to_owned(),
264 ),
265 },
266 }
267 } else {
268 match response_rx.await {
269 Ok(resp) => {
270 match resp {
271 AuthResponse::DirectAuthOk { connection_id, message, rootwards_tx, leafwards_rx } => {
272 UnauthenticatedMessageOutcome::Proceed {
273 state: ReadyConnectionState {
274 connection_id,
275 auth_tx: self.auth_tx,
276 leafwards_rx,
277 rootwards_tx
278 },
279 reply: LeafwardsMessage::ChildAuthOutcome {
280 child_participant_id,
281 version: MUTATION_PROTOCOL_VERSION,
282 ok: true,
283 message
284 }
285 }
286 }
287 AuthResponse::DelegatingAuthOk { message } => {
288 UnauthenticatedMessageOutcome::StayPut {
289 state: self,
290 reply: LeafwardsMessage::ChildAuthOutcome {
291 child_participant_id,
292 version: MUTATION_PROTOCOL_VERSION,
293 ok: true,
294 message
295 }
296 }
297 }
298 AuthResponse::NotAuth { message } => {
299 UnauthenticatedMessageOutcome::StayPut { state: self, reply: LeafwardsMessage::ChildAuthOutcome {
300 child_participant_id,
301 version: MUTATION_PROTOCOL_VERSION,
302 ok: false,
303 message
304 } }
305 }
306 }
307 },
308 Err(_recv_err) => {
309 UnauthenticatedMessageOutcome::StayPut { state: self, reply: LeafwardsMessage::ChildAuthOutcome {
310 child_participant_id,
311 version: MUTATION_PROTOCOL_VERSION,
312 ok: false,
313 message:Some("Mutation plane child connection could not receive auth request over internal channel.".to_owned())
314 } }
315 }
316 }
317 }
318 }
319 _ => UnauthenticatedMessageOutcome::StayPut {
320 state: self,
321 reply: LeafwardsMessage::UnauthenticatedResponse {},
322 },
323 }
324 }
325}
326
327struct ReadyConnectionState {
328 connection_id: ChildConnectionId,
329 auth_tx: tokio::sync::mpsc::Sender<AuthReq>,
330 leafwards_rx: tokio::sync::mpsc::Receiver<LeafwardsMessage>,
331 rootwards_tx: tokio::sync::mpsc::Sender<Rootwards>,
332}
333
334struct ReadyMessageOutcome {
335 reply_to_child: Option<LeafwardsMessage>,
336 send_to_root: Option<Rootwards>, }
338
339impl ReadyConnectionState {
340 async fn handle_rootwards_message(&mut self, msg: RootwardsMessage) -> ReadyMessageOutcome {
341 match msg {
342 RootwardsMessage::ChildAuthAttempt {
343 child_participant_id,
344 version,
345 token,
346 } => {
347 tracing::debug!(version = version, participant_id = %child_participant_id, "Auth attempt from already-authorized child connection");
348 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
349 if self
350 .auth_tx
351 .send(AuthReq {
352 is_direct: false,
355 token,
356 participant_id: child_participant_id,
357 response_tx,
358 })
359 .await
360 .is_err()
361 {
362 ReadyMessageOutcome {
363 reply_to_child: Some(LeafwardsMessage::ChildAuthOutcome {
364 child_participant_id,
365 version: MUTATION_PROTOCOL_VERSION,
366 ok: false,
367 message: Some(
368 "Could not send auth request over internal channel".to_owned(),
369 ),
370 }),
371 send_to_root: None,
372 }
373 } else {
374 match response_rx.await {
375 Ok(resp) => {
376 match resp {
377 AuthResponse::DirectAuthOk { connection_id: _, message, rootwards_tx: _, leafwards_rx : _} => {
378 ReadyMessageOutcome {
379 reply_to_child: Some(LeafwardsMessage::ChildAuthOutcome {
380 child_participant_id,
381 version: MUTATION_PROTOCOL_VERSION,
382 ok: true,
383 message
384 }),
385 send_to_root: None
386 }
387 }
388 AuthResponse::DelegatingAuthOk { message } => {
389 ReadyMessageOutcome {
390 reply_to_child: Some(LeafwardsMessage::ChildAuthOutcome {
391 child_participant_id,
392 version: MUTATION_PROTOCOL_VERSION,
393 ok: true,
394 message
395 }),
396 send_to_root: None
397 }
398 }
399 AuthResponse::NotAuth { message } => {
400 ReadyMessageOutcome { reply_to_child: Some(LeafwardsMessage::ChildAuthOutcome {
401 child_participant_id,
402 version: MUTATION_PROTOCOL_VERSION,
403 ok: false,
404 message
405 }),
406 send_to_root: None
407 }
408 }
409 }
410 },
411 Err(_recv_err) => {
412 ReadyMessageOutcome { reply_to_child: Some(LeafwardsMessage::ChildAuthOutcome {
413 child_participant_id,
414 version: MUTATION_PROTOCOL_VERSION,
415 ok: false,
416 message:Some("Mutation plane child connection could not receive auth request over internal channel.".to_owned())
417 }),
418 send_to_root: None
419 }
420 }
421 }
422 }
423 }
424 RootwardsMessage::MutatorAnnouncement {
425 participant_id,
426 mutator_id,
427 mutator_attrs,
428 } => ReadyMessageOutcome {
429 reply_to_child: None,
430 send_to_root: Some(Rootwards::MutatorAnnouncement {
431 connection_id: self.connection_id,
432 participant_id,
433 mutator_id,
434 mutator_attrs: mutator_attrs
435 .0
436 .into_iter()
437 .map(|kv| (AttrKey::from(kv.key), kv.value))
438 .collect(),
439 }),
440 },
441 RootwardsMessage::MutatorRetirement {
442 participant_id,
443 mutator_id,
444 } => ReadyMessageOutcome {
445 reply_to_child: None,
446 send_to_root: Some(Rootwards::MutatorRetirement {
447 connection_id: self.connection_id,
448 participant_id,
449 mutator_id,
450 }),
451 },
452 RootwardsMessage::UpdateTriggerState {
453 mutator_id,
454 mutation_id,
455 maybe_trigger_crdt,
456 } => ReadyMessageOutcome {
457 reply_to_child: None,
458 send_to_root: Some(Rootwards::UpdateTriggerState {
459 connection_id: self.connection_id,
460 mutator_id,
461 mutation_id,
462 maybe_trigger_crdt,
463 }),
464 },
465 }
466 }
467}