Skip to main content

aura_agent/runtime/effects/
choreography.rs

1use super::{AuraEffectSystem, CHOREO_FLOW_COST_PER_KB, DEFAULT_CHOREO_FLOW_COST};
2use async_trait::async_trait;
3use aura_chat::capabilities::ChatCapability;
4use aura_core::effects::transport::{TransportEnvelope, TransportReceipt};
5use aura_core::effects::{PhysicalTimeEffects, TransportEffects, WakeCondition};
6use aura_core::hash::hash;
7use aura_core::{AuthorityId, ContextId, FlowCost};
8use aura_guards::prelude::create_send_guard_op;
9use aura_guards::{GuardOperation, JournalCoupler};
10use aura_protocol::effects::{
11    ChoreographicEffects, ChoreographicRole, ChoreographyError, ChoreographyEvent,
12    ChoreographyMetrics, RoleIndex,
13};
14use std::collections::HashMap;
15
16use crate::runtime::subsystems::choreography::RuntimeChoreographySessionId;
17use crate::runtime::subsystems::choreography::SessionStartError;
18
19fn current_session_snapshot(
20    effects: &AuraEffectSystem,
21) -> Result<crate::runtime::subsystems::choreography::ChoreographySessionState, ChoreographyError> {
22    effects
23        .choreography_state
24        .read()
25        .current_session()
26        .ok_or(ChoreographyError::SessionNotStarted)
27}
28
29fn take_session_envelope(
30    effects: &AuraEffectSystem,
31    session_id: RuntimeChoreographySessionId,
32    source: AuthorityId,
33    context: ContextId,
34) -> Option<TransportEnvelope> {
35    let self_device_id = effects.config.device_id.to_string();
36    effects
37        .choreography_state
38        .write()
39        .take_matching_session_envelope(
40            session_id,
41            source,
42            context,
43            effects.authority_id,
44            &self_device_id,
45        )
46}
47
48fn promote_shared_session_envelopes(
49    effects: &AuraEffectSystem,
50    session_id: RuntimeChoreographySessionId,
51) {
52    let Some(shared) = effects.transport.shared_transport() else {
53        return;
54    };
55    let session_ref = session_id.to_string();
56    let inbox = shared.inbox_for(effects.authority_id);
57    let mut inbox = inbox.write();
58    let mut promoted = Vec::new();
59
60    let mut index = 0usize;
61    while index < inbox.len() {
62        let matches_session = inbox[index]
63            .metadata
64            .get("content-type")
65            .is_some_and(|value| value == "application/aura-choreography")
66            && inbox[index]
67                .metadata
68                .get("session-id")
69                .is_some_and(|value| value == &session_ref);
70        if matches_session {
71            promoted.push(inbox.remove(index));
72        } else {
73            index += 1;
74        }
75    }
76    drop(inbox);
77
78    if promoted.is_empty() {
79        return;
80    }
81
82    let mut state = effects.choreography_state.write();
83    for envelope in promoted {
84        state.queue_session_envelope(session_id, envelope);
85    }
86}
87
88// Implementation of ChoreographicEffects
89#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
90#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
91impl ChoreographicEffects for AuraEffectSystem {
92    async fn send_to_role_bytes(
93        &self,
94        role: ChoreographicRole,
95        message: Vec<u8>,
96    ) -> Result<(), ChoreographyError> {
97        let session = current_session_snapshot(self)?;
98        let context_id = session.context_id;
99        let current_role = session.current_role;
100
101        let peer = role.authority_id;
102        tracing::debug!(
103            session_id = %session.session_id,
104            from = ?current_role.device_id,
105            to = ?role.device_id,
106            peer = %peer,
107            ?context_id,
108            bytes = message.len(),
109            "choreography send"
110        );
111        let kb_units = ((message.len() as u32).saturating_add(1023)) / 1024;
112        let flow_cost = DEFAULT_CHOREO_FLOW_COST
113            .saturating_add(kb_units.saturating_mul(CHOREO_FLOW_COST_PER_KB));
114
115        let guard_chain = create_send_guard_op(
116            GuardOperation::Custom(ChatCapability::MessageSend.as_name().to_string()),
117            context_id,
118            peer,
119            FlowCost::new(flow_cost),
120        )
121        .with_operation_id(format!(
122            "choreography_send_{}_{}_{:?}",
123            session.session_id, context_id, role.device_id
124        ));
125
126        let guard_result =
127            guard_chain
128                .evaluate(self)
129                .await
130                .map_err(|e| ChoreographyError::InternalError {
131                    message: format!("Choreography send guard failed: {e}"),
132                })?;
133
134        if !guard_result.authorized {
135            return Err(ChoreographyError::InternalError {
136                message: guard_result
137                    .denial_reason
138                    .unwrap_or_else(|| "Choreography send denied by guard chain".to_string()),
139            });
140        }
141
142        JournalCoupler::new()
143            .couple_with_send(self, &guard_result.receipt)
144            .await
145            .map_err(|e| ChoreographyError::InternalError {
146                message: format!("Choreography journal coupling failed: {e}"),
147            })?;
148
149        let transport_receipt = guard_result
150            .receipt
151            .as_ref()
152            .map(|receipt| TransportReceipt {
153                context: receipt.ctx,
154                src: receipt.src,
155                dst: receipt.dst,
156                epoch: receipt.epoch.value(),
157                cost: receipt.cost.value(),
158                nonce: receipt.nonce.value(),
159                prev: receipt.prev.0,
160                sig: receipt.sig.clone().into_bytes(),
161            });
162
163        // Include choreography metadata so receivers can identify and route these messages
164        let mut metadata = HashMap::new();
165        metadata.insert(
166            "content-type".to_string(),
167            "application/aura-choreography".to_string(),
168        );
169        metadata.insert("session-id".to_string(), session.session_id.to_string());
170        metadata.insert(
171            "aura-source-device-id".to_string(),
172            current_role.device_id.to_string(),
173        );
174        metadata.insert(
175            "aura-destination-device-id".to_string(),
176            role.device_id.to_string(),
177        );
178        if let Some(protocol_id) = session.protocol_id.as_ref() {
179            metadata.insert("protocol-id".to_string(), protocol_id.clone());
180        }
181
182        let envelope = TransportEnvelope {
183            destination: peer,
184            source: current_role.authority_id,
185            context: context_id,
186            payload: message,
187            metadata,
188            receipt: transport_receipt,
189        };
190
191        TransportEffects::send_envelope(self, envelope)
192            .await
193            .map_err(|e| ChoreographyError::Transport {
194                source: Box::new(e),
195            })?;
196
197        {
198            let mut state = self.choreography_state.write();
199            state
200                .with_current_session_mut(|session| {
201                    session.metrics.messages_sent = session.metrics.messages_sent.saturating_add(1);
202                })
203                .map_err(|message| ChoreographyError::InternalError { message })?;
204        }
205        Ok(())
206    }
207
208    async fn receive_from_role_bytes(
209        &self,
210        role: ChoreographicRole,
211    ) -> Result<Vec<u8>, ChoreographyError> {
212        let session = current_session_snapshot(self)?;
213        let context_id = session.context_id;
214        let session_id = session.session_id;
215        let session_inbox_notify = self
216            .choreography_state
217            .read()
218            .session_inbox_notify(session_id);
219        let shared_inbox_notify = self
220            .transport
221            .shared_transport()
222            .map(|shared| shared.inbox_notify(self.authority_id));
223
224        // Wait on the session-local inbox notifier instead of polling the global inbox.
225        // Default timeout remains 5 seconds to allow async guardians time to respond.
226        let timeout_ms = session.timeout_ms.unwrap_or(5000);
227        let timeout_handle = self
228            .time_handler
229            .set_timeout(timeout_ms)
230            .await
231            .map_err(|error| ChoreographyError::InternalError {
232                message: format!("failed to issue receive timeout witness: {error}"),
233            })?;
234
235        let source_authority = role.authority_id;
236        tracing::debug!(
237            session_id = %session_id,
238            "Choreography receive: waiting for message from {:?} (authority {:?}) in context {:?}, timeout={}ms",
239            role.device_id,
240            source_authority,
241            context_id,
242            timeout_ms
243        );
244
245        let envelope = loop {
246            if let Some(env) = take_session_envelope(self, session_id, source_authority, context_id)
247            {
248                self.transport.record_receive();
249                break env;
250            }
251
252            promote_shared_session_envelopes(self, session_id);
253            if let Some(env) = take_session_envelope(self, session_id, source_authority, context_id)
254            {
255                self.transport.record_receive();
256                break env;
257            }
258
259            let Some(session_inbox_notify) = session_inbox_notify.clone() else {
260                let _ = self.time_handler.cancel_timeout(timeout_handle).await;
261                return Err(ChoreographyError::InternalError {
262                    message: format!(
263                        "missing choreography inbox notifier for active session {session_id}"
264                    ),
265                });
266            };
267
268            tokio::select! {
269                _ = session_inbox_notify.notified() => {}
270                _ = async {
271                    if let Some(shared_inbox_notify) = shared_inbox_notify.clone() {
272                        shared_inbox_notify.notified().await;
273                    } else {
274                        std::future::pending::<()>().await;
275                    }
276                } => {}
277                timeout_result = self.time_handler.yield_until(WakeCondition::TimeoutExpired {
278                    timeout_id: timeout_handle,
279                }) => {
280                    if let Err(error) = timeout_result {
281                        return Err(ChoreographyError::InternalError {
282                            message: format!("receive timeout witness failed: {error}"),
283                        });
284                    }
285                    let mut state = self.choreography_state.write();
286                    let _ = state.with_current_session_mut(|session| {
287                        session.metrics.timeout_count = session.metrics.timeout_count.saturating_add(1);
288                    });
289                    return Err(ChoreographyError::Transport {
290                        source: Box::new(aura_core::effects::TransportError::NoMessage),
291                    });
292                }
293            }
294
295            if !self.choreography_state.read().is_active() {
296                let _ = self.time_handler.cancel_timeout(timeout_handle).await;
297                return Err(ChoreographyError::SessionNotStarted);
298            }
299            if self
300                .choreography_state
301                .read()
302                .current_session_id()
303                .is_some_and(|active| active != session_id)
304            {
305                let _ = self.time_handler.cancel_timeout(timeout_handle).await;
306                return Err(ChoreographyError::InternalError {
307                    message: format!(
308                        "choreography session binding changed while waiting for receive: {session_id}"
309                    ),
310                });
311            }
312        };
313
314        let _ = self.time_handler.cancel_timeout(timeout_handle).await;
315
316        {
317            let mut state = self.choreography_state.write();
318            state
319                .with_current_session_mut(|session| {
320                    session.metrics.messages_received =
321                        session.metrics.messages_received.saturating_add(1);
322                })
323                .map_err(|message| ChoreographyError::InternalError { message })?;
324        }
325
326        Ok(envelope.payload)
327    }
328
329    async fn broadcast_bytes(&self, message: Vec<u8>) -> Result<(), ChoreographyError> {
330        let session = current_session_snapshot(self)?;
331        let roles = session.roles.clone();
332        let current_role = session.current_role;
333
334        for role in roles {
335            if role == current_role {
336                continue;
337            }
338            self.send_to_role_bytes(role, message.clone()).await?;
339        }
340
341        Ok(())
342    }
343
344    #[allow(clippy::disallowed_methods)]
345    fn current_role(&self) -> ChoreographicRole {
346        current_session_snapshot(self).map_or_else(
347            |_| {
348                let role_index = RoleIndex::new(0).expect("role index");
349                ChoreographicRole::with_authority(
350                    self.config.device_id(),
351                    self.authority_id,
352                    role_index,
353                )
354            },
355            |session| session.current_role,
356        )
357    }
358
359    fn all_roles(&self) -> Vec<ChoreographicRole> {
360        current_session_snapshot(self).map_or_else(
361            |_| vec![self.current_role()],
362            |session| {
363                if session.roles.is_empty() {
364                    vec![self.current_role()]
365                } else {
366                    session.roles
367                }
368            },
369        )
370    }
371
372    async fn is_role_active(&self, role: ChoreographicRole) -> bool {
373        let context_id = match current_session_snapshot(self) {
374            Ok(session) => session.context_id,
375            Err(_) => return false,
376        };
377
378        TransportEffects::is_channel_established(self, context_id, role.authority_id).await
379    }
380
381    async fn start_session(
382        &self,
383        session_id: uuid::Uuid,
384        roles: Vec<ChoreographicRole>,
385    ) -> Result<(), ChoreographyError> {
386        let runtime_session_id = RuntimeChoreographySessionId::from_uuid(session_id);
387        let current_device = self.config.device_id();
388        let current_role = roles
389            .iter()
390            .find(|role| role.device_id == current_device)
391            .or_else(|| {
392                roles
393                    .iter()
394                    .find(|role| role.authority_id == self.authority_id)
395            })
396            .copied()
397            .ok_or_else(|| {
398                let role_index = RoleIndex::new(0).expect("role index");
399                ChoreographyError::RoleNotFound {
400                    role: ChoreographicRole::with_authority(
401                        current_device,
402                        self.authority_id,
403                        role_index,
404                    ),
405                }
406            })?;
407
408        // Each runtime choreography session gets its own derived relational context so
409        // guard, leakage, and journal coupling stay isolated under concurrent execution.
410        let context_id = ContextId::new_from_entropy(hash(session_id.as_bytes()));
411        tracing::debug!(
412            "Choreography start_session: session_id={}, context_id={:?}, authority={:?}, roles={:?}",
413            runtime_session_id,
414            context_id,
415            self.authority_id,
416            roles.iter().map(|r| r.device_id).collect::<Vec<_>>()
417        );
418        let started_at_ms = self
419            .physical_time()
420            .await
421            .map(|time| time.ts_ms)
422            .unwrap_or_default();
423
424        let mut state = self.choreography_state.write();
425        state
426            .start_session(
427                runtime_session_id,
428                None,
429                context_id,
430                roles,
431                current_role,
432                None,
433                started_at_ms,
434            )
435            .map_err(|error| match error {
436                SessionStartError::SessionAlreadyExists { .. } => {
437                    ChoreographyError::SessionAlreadyExists { session_id }
438                }
439                SessionStartError::TaskAlreadyBound { .. } => ChoreographyError::InternalError {
440                    message: error.to_string(),
441                },
442            })
443    }
444
445    async fn end_session(&self) -> Result<(), ChoreographyError> {
446        let ended_at_ms = self
447            .physical_time()
448            .await
449            .map(|time| time.ts_ms)
450            .unwrap_or_default();
451
452        let mut state = self.choreography_state.write();
453        let ended_session_id = state
454            .end_session(ended_at_ms)
455            .map_err(|_| ChoreographyError::SessionNotStarted)?;
456        drop(state);
457        let _released_fragments = self.release_vm_fragments_for_session(ended_session_id);
458        Ok(())
459    }
460
461    async fn emit_choreo_event(&self, event: ChoreographyEvent) -> Result<(), ChoreographyError> {
462        tracing::debug!(?event, "choreography event");
463        Ok(())
464    }
465
466    async fn set_timeout(&self, timeout_ms: u64) {
467        let mut state = self.choreography_state.write();
468        let _ = state.with_current_session_mut(|session| {
469            session.timeout_ms = Some(timeout_ms);
470        });
471    }
472
473    async fn get_metrics(&self) -> ChoreographyMetrics {
474        current_session_snapshot(self).map_or_else(|_| default_metrics(), |session| session.metrics)
475    }
476}
477
478fn default_metrics() -> ChoreographyMetrics {
479    ChoreographyMetrics {
480        messages_sent: 0,
481        messages_received: 0,
482        avg_latency_ms: 0.0,
483        timeout_count: 0,
484        retry_count: 0,
485        total_duration_ms: 0,
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492    use crate::core::AgentConfig;
493    use aura_core::DeviceId;
494    use std::sync::Arc;
495    use std::time::Duration;
496    use tokio::sync::Barrier;
497    use uuid::Uuid;
498
499    async fn assert_settles_within<T, E: std::fmt::Debug>(
500        future: impl std::future::Future<Output = Result<T, E>>,
501        timeout: Duration,
502        message: &str,
503    ) -> Result<T, E> {
504        let time = aura_effects::time::PhysicalTimeHandler::new();
505        let started_at = time
506            .physical_time()
507            .await
508            .expect("physical time should be available");
509        let budget = aura_core::TimeoutBudget::from_start_and_timeout(&started_at, timeout)
510            .expect("timeout budget should fit");
511        match aura_core::execute_with_timeout_budget(&time, &budget, || future).await {
512            Ok(value) => Ok(value),
513            Err(aura_core::TimeoutRunError::Operation(error)) => Err(error),
514            Err(aura_core::TimeoutRunError::Timeout(error)) => {
515                panic!("{message}: timeout budget exceeded: {error:?}")
516            }
517        }
518    }
519
520    fn test_effects(authority_id: AuthorityId) -> Arc<AuraEffectSystem> {
521        let authority_bytes = authority_id.to_bytes();
522        let seed_salt = u64::from_le_bytes(authority_bytes[..8].try_into().expect("salt bytes"));
523        Arc::new(
524            AuraEffectSystem::simulation_for_test_for_authority_with_salt(
525                &AgentConfig::default(),
526                authority_id,
527                seed_salt,
528            )
529            .expect("testing effect system"),
530        )
531    }
532
533    fn authority_device_role(authority_id: AuthorityId, role_index: u16) -> ChoreographicRole {
534        ChoreographicRole::for_authority(
535            authority_id,
536            RoleIndex::new(role_index.into()).expect("role index"),
537        )
538    }
539
540    #[tokio::test]
541    async fn concurrent_sessions_are_isolated_per_task() {
542        let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([7; 16]));
543        let effects = test_effects(authority_id);
544        let barrier = Arc::new(Barrier::new(3));
545
546        let session_a = Uuid::from_u128(1);
547        let session_b = Uuid::from_u128(2);
548        let peer_a = ChoreographicRole::new(
549            DeviceId::from_uuid(Uuid::from_u128(11)),
550            AuthorityId::new_from_entropy([11u8; 32]),
551            RoleIndex::new(1).expect("role index"),
552        );
553        let peer_b = ChoreographicRole::new(
554            DeviceId::from_uuid(Uuid::from_u128(12)),
555            AuthorityId::new_from_entropy([12u8; 32]),
556            RoleIndex::new(1).expect("role index"),
557        );
558
559        let task_a_effects = Arc::clone(&effects);
560        let task_a_barrier = Arc::clone(&barrier);
561        let mut tasks = tokio::task::JoinSet::new();
562        tasks.spawn(async move {
563            task_a_effects
564                .start_session(
565                    session_a,
566                    vec![authority_device_role(authority_id, 0), peer_a],
567                )
568                .await
569                .expect("session a starts");
570            task_a_barrier.wait().await;
571            assert_eq!(
572                task_a_effects.current_role(),
573                authority_device_role(authority_id, 0)
574            );
575            assert_eq!(task_a_effects.all_roles().len(), 2);
576            task_a_effects.set_timeout(111).await;
577            assert_eq!(task_a_effects.get_metrics().await.messages_sent, 0);
578            task_a_effects.end_session().await.expect("session a ends");
579        });
580
581        let task_b_effects = Arc::clone(&effects);
582        let task_b_barrier = Arc::clone(&barrier);
583        tasks.spawn(async move {
584            task_b_effects
585                .start_session(
586                    session_b,
587                    vec![authority_device_role(authority_id, 0), peer_b],
588                )
589                .await
590                .expect("session b starts");
591            task_b_barrier.wait().await;
592            assert_eq!(
593                task_b_effects.current_role(),
594                authority_device_role(authority_id, 0)
595            );
596            assert_eq!(task_b_effects.all_roles().len(), 2);
597            task_b_effects.set_timeout(222).await;
598            assert_eq!(task_b_effects.get_metrics().await.messages_received, 0);
599            task_b_effects.end_session().await.expect("session b ends");
600        });
601
602        barrier.wait().await;
603        tasks
604            .join_next()
605            .await
606            .expect("task a joined")
607            .expect("task a");
608        tasks
609            .join_next()
610            .await
611            .expect("task b joined")
612            .expect("task b");
613        assert_eq!(effects.choreography_state.read().active_session_count(), 0);
614    }
615
616    #[tokio::test]
617    async fn concurrent_session_sends_keep_guard_and_transport_contexts_isolated() {
618        let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([13; 16]));
619        let effects = Arc::new(
620            AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
621                &AgentConfig::default(),
622                authority_id,
623                crate::runtime::SharedTransport::new(),
624            )
625            .expect("testing effect system with shared transport"),
626        );
627        let barrier = Arc::new(Barrier::new(3));
628
629        let session_a = Uuid::from_u128(41);
630        let session_b = Uuid::from_u128(42);
631        let self_role = authority_device_role(authority_id, 0);
632        let loopback_peer = authority_device_role(authority_id, 1);
633
634        let task_a_effects = Arc::clone(&effects);
635        let task_a_barrier = Arc::clone(&barrier);
636        let mut tasks = tokio::task::JoinSet::new();
637        tasks.spawn(async move {
638            task_a_effects
639                .start_session(session_a, vec![self_role, loopback_peer])
640                .await
641                .expect("session a starts");
642            task_a_barrier.wait().await;
643            task_a_effects
644                .send_to_role_bytes(loopback_peer, b"alpha".to_vec())
645                .await
646                .expect("session a send succeeds");
647            task_a_effects.end_session().await.expect("session a ends");
648        });
649
650        let task_b_effects = Arc::clone(&effects);
651        let task_b_barrier = Arc::clone(&barrier);
652        tasks.spawn(async move {
653            task_b_effects
654                .start_session(session_b, vec![self_role, loopback_peer])
655                .await
656                .expect("session b starts");
657            task_b_barrier.wait().await;
658            task_b_effects
659                .send_to_role_bytes(loopback_peer, b"beta".to_vec())
660                .await
661                .expect("session b send succeeds");
662            task_b_effects.end_session().await.expect("session b ends");
663        });
664
665        barrier.wait().await;
666        tasks
667            .join_next()
668            .await
669            .expect("first task joined")
670            .expect("first task result");
671        tasks
672            .join_next()
673            .await
674            .expect("second task joined")
675            .expect("second task result");
676        let shared = effects
677            .transport
678            .shared_transport()
679            .expect("shared transport should be attached for the test");
680        let shared_inbox = shared.inbox_for(authority_id).read().clone();
681        let session_a_envelopes = shared_inbox
682            .iter()
683            .filter(|env| {
684                env.metadata
685                    .get("session-id")
686                    .is_some_and(|value| value == &session_a.to_string())
687            })
688            .cloned()
689            .collect::<Vec<_>>();
690        let session_b_envelopes = shared_inbox
691            .iter()
692            .filter(|env| {
693                env.metadata
694                    .get("session-id")
695                    .is_some_and(|value| value == &session_b.to_string())
696            })
697            .cloned()
698            .collect::<Vec<_>>();
699        assert_eq!(
700            session_a_envelopes.len(),
701            1,
702            "session a should queue one local send"
703        );
704        assert_eq!(
705            session_b_envelopes.len(),
706            1,
707            "session b should queue one local send"
708        );
709
710        let expected = [
711            (
712                session_a.to_string(),
713                ContextId::new_from_entropy(hash(session_a.as_bytes())),
714                session_a_envelopes,
715            ),
716            (
717                session_b.to_string(),
718                ContextId::new_from_entropy(hash(session_b.as_bytes())),
719                session_b_envelopes,
720            ),
721        ];
722
723        for (session_id, context_id, envelopes) in expected {
724            let envelope = envelopes
725                .iter()
726                .find(|env| {
727                    env.metadata
728                        .get("session-id")
729                        .is_some_and(|value| value == &session_id)
730                })
731                .expect("session envelope should be present");
732            assert_eq!(envelope.context, context_id);
733            assert_eq!(
734                envelope.receipt.as_ref().map(|receipt| receipt.context),
735                Some(context_id),
736                "guard/journal receipt context must remain session-scoped"
737            );
738        }
739    }
740
741    #[tokio::test]
742    async fn receive_filters_by_session_id_metadata() {
743        let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([9; 16]));
744        let peer_authority = AuthorityId::from_uuid(Uuid::from_bytes([10; 16]));
745        let effects = test_effects(authority_id);
746        let session_id = Uuid::from_u128(33);
747        let wrong_session_id = Uuid::from_u128(34);
748        let self_role = authority_device_role(authority_id, 0);
749        let peer_role = authority_device_role(peer_authority, 1);
750
751        effects
752            .start_session(session_id, vec![self_role, peer_role])
753            .await
754            .expect("session starts");
755
756        let context_id = ContextId::new_from_entropy(hash(session_id.as_bytes()));
757        for (sid, payload) in [
758            (wrong_session_id, b"wrong".to_vec()),
759            (session_id, b"correct".to_vec()),
760        ] {
761            let mut metadata = HashMap::new();
762            metadata.insert(
763                "content-type".to_string(),
764                "application/aura-choreography".to_string(),
765            );
766            metadata.insert("session-id".to_string(), sid.to_string());
767            effects.requeue_envelope(TransportEnvelope {
768                destination: authority_id,
769                source: peer_authority,
770                context: context_id,
771                payload,
772                metadata,
773                receipt: None,
774            });
775        }
776        {
777            let state = effects.choreography_state.read();
778            assert_eq!(
779                state.session_inbox_len(RuntimeChoreographySessionId::from_uuid(wrong_session_id)),
780                1
781            );
782            assert_eq!(
783                state.session_inbox_len(RuntimeChoreographySessionId::from_uuid(session_id)),
784                1
785            );
786        }
787
788        assert_eq!(peer_role.authority_id, peer_authority);
789        let payload = take_session_envelope(
790            effects.as_ref(),
791            RuntimeChoreographySessionId::from_uuid(session_id),
792            peer_authority,
793            context_id,
794        )
795        .expect("session-scoped envelope should be available")
796        .payload;
797        assert_eq!(payload, b"correct".to_vec());
798        {
799            let state = effects.choreography_state.read();
800            assert_eq!(
801                state.session_inbox_len(RuntimeChoreographySessionId::from_uuid(wrong_session_id)),
802                1
803            );
804            assert_eq!(
805                state.session_inbox_len(RuntimeChoreographySessionId::from_uuid(session_id)),
806                0
807            );
808        }
809
810        effects.end_session().await.expect("session ends");
811    }
812
813    #[tokio::test]
814    async fn receive_waits_on_session_local_notify() {
815        let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([11; 16]));
816        let peer_authority = AuthorityId::from_uuid(Uuid::from_bytes([12; 16]));
817        let effects = test_effects(authority_id);
818        let session_id = Uuid::from_u128(35);
819        let self_role = authority_device_role(authority_id, 0);
820        let peer_role = authority_device_role(peer_authority, 1);
821
822        effects
823            .start_session(session_id, vec![self_role, peer_role])
824            .await
825            .expect("session starts");
826
827        let context_id = ContextId::new_from_entropy(hash(session_id.as_bytes()));
828        let delayed_effects = Arc::clone(&effects);
829        let mut delayed_tasks = tokio::task::JoinSet::new();
830        delayed_tasks.spawn(async move {
831            delayed_effects.time_handler.sleep_ms(10).await;
832            let mut metadata = HashMap::new();
833            metadata.insert(
834                "content-type".to_string(),
835                "application/aura-choreography".to_string(),
836            );
837            metadata.insert("session-id".to_string(), session_id.to_string());
838            delayed_effects.requeue_envelope(TransportEnvelope {
839                destination: authority_id,
840                source: peer_authority,
841                context: context_id,
842                payload: b"notified".to_vec(),
843                metadata,
844                receipt: None,
845            });
846        });
847
848        let payload = assert_settles_within(
849            effects.receive_from_role_bytes(peer_role),
850            Duration::from_millis(40),
851            "session-local notify should wake receive before polling-sized timeout",
852        )
853        .await;
854        delayed_tasks
855            .join_next()
856            .await
857            .expect("enqueue task joined")
858            .expect("enqueue task");
859        let payload = payload.expect("session-scoped receive succeeds");
860        assert_eq!(payload, b"notified".to_vec());
861
862        effects.end_session().await.expect("session ends");
863    }
864
865    #[tokio::test]
866    async fn concurrent_inbound_delivery_remains_isolated_per_active_fragment() {
867        let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([19; 16]));
868        let effects = test_effects(authority_id);
869        let barrier = Arc::new(Barrier::new(3));
870
871        let session_a = Uuid::from_u128(38);
872        let session_b = Uuid::from_u128(39);
873        let peer_a_authority = AuthorityId::from_uuid(Uuid::from_bytes([20; 16]));
874        let peer_b_authority = AuthorityId::from_uuid(Uuid::from_bytes([21; 16]));
875        let self_role = authority_device_role(authority_id, 0);
876        let peer_a_role = authority_device_role(peer_a_authority, 1);
877        let peer_b_role = authority_device_role(peer_b_authority, 1);
878
879        let task_a_effects = Arc::clone(&effects);
880        let task_a_barrier = Arc::clone(&barrier);
881        let mut tasks = tokio::task::JoinSet::new();
882        tasks.spawn(async move {
883            task_a_effects
884                .start_session(session_a, vec![self_role, peer_a_role])
885                .await
886                .expect("session a starts");
887            task_a_barrier.wait().await;
888            let payload = task_a_effects
889                .receive_from_role_bytes(peer_a_role)
890                .await
891                .expect("session a receive succeeds");
892            task_a_effects.end_session().await.expect("session a ends");
893            payload
894        });
895
896        let task_b_effects = Arc::clone(&effects);
897        let task_b_barrier = Arc::clone(&barrier);
898        tasks.spawn(async move {
899            task_b_effects
900                .start_session(session_b, vec![self_role, peer_b_role])
901                .await
902                .expect("session b starts");
903            task_b_barrier.wait().await;
904            let payload = task_b_effects
905                .receive_from_role_bytes(peer_b_role)
906                .await
907                .expect("session b receive succeeds");
908            task_b_effects.end_session().await.expect("session b ends");
909            payload
910        });
911
912        let enqueue = async {
913            barrier.wait().await;
914            for (session_id, source, payload) in [
915                (session_a, peer_a_authority, b"alpha".to_vec()),
916                (session_b, peer_b_authority, b"beta".to_vec()),
917            ] {
918                let mut metadata = HashMap::new();
919                metadata.insert(
920                    "content-type".to_string(),
921                    "application/aura-choreography".to_string(),
922                );
923                metadata.insert("session-id".to_string(), session_id.to_string());
924                effects.requeue_envelope(TransportEnvelope {
925                    destination: authority_id,
926                    source,
927                    context: ContextId::new_from_entropy(hash(session_id.as_bytes())),
928                    payload,
929                    metadata,
930                    receipt: None,
931                });
932            }
933        };
934
935        enqueue.await;
936        let first = tasks
937            .join_next()
938            .await
939            .expect("first receive task joined")
940            .expect("first receive task");
941        let second = tasks
942            .join_next()
943            .await
944            .expect("second receive task joined")
945            .expect("second receive task");
946        assert!(matches!(
947            (first.as_slice(), second.as_slice()),
948            (b"alpha", b"beta") | (b"beta", b"alpha")
949        ));
950        assert_eq!(effects.choreography_state.read().active_session_count(), 0);
951    }
952
953    #[tokio::test]
954    async fn session_sends_include_protocol_and_device_routing_metadata() {
955        let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([0x71; 16]));
956        let effects = Arc::new(
957            AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
958                &AgentConfig::default(),
959                authority_id,
960                crate::runtime::SharedTransport::new(),
961            )
962            .expect("testing effect system with shared transport"),
963        );
964        let session_id = Uuid::from_u128(0x7172);
965        let self_role = authority_device_role(authority_id, 0);
966        let loopback_peer = authority_device_role(authority_id, 1);
967
968        effects
969            .start_session(session_id, vec![self_role, loopback_peer])
970            .await
971            .expect("session starts");
972        effects
973            .set_current_runtime_choreography_protocol_id("aura.test.protocol")
974            .expect("protocol id attaches to current session");
975        effects
976            .send_to_role_bytes(loopback_peer, b"hello".to_vec())
977            .await
978            .expect("send succeeds");
979
980        let shared = effects
981            .transport
982            .shared_transport()
983            .expect("shared transport should be attached for the test");
984        let envelope = shared
985            .inbox_for(authority_id)
986            .read()
987            .first()
988            .cloned()
989            .expect("loopback send should queue one envelope");
990        let session_id_string = session_id.to_string();
991        let source_device_string = self_role.device_id.to_string();
992        let destination_device_string = loopback_peer.device_id.to_string();
993
994        assert_eq!(
995            envelope.metadata.get("content-type").map(String::as_str),
996            Some("application/aura-choreography")
997        );
998        assert_eq!(
999            envelope.metadata.get("session-id").map(String::as_str),
1000            Some(session_id_string.as_str())
1001        );
1002        assert_eq!(
1003            envelope.metadata.get("protocol-id").map(String::as_str),
1004            Some("aura.test.protocol")
1005        );
1006        assert_eq!(
1007            envelope
1008                .metadata
1009                .get("aura-source-device-id")
1010                .map(String::as_str),
1011            Some(source_device_string.as_str())
1012        );
1013        assert_eq!(
1014            envelope
1015                .metadata
1016                .get("aura-destination-device-id")
1017                .map(String::as_str),
1018            Some(destination_device_string.as_str())
1019        );
1020
1021        effects.end_session().await.expect("session ends");
1022    }
1023
1024    #[tokio::test]
1025    async fn async_ingress_reordering_preserves_communication_identity() {
1026        let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([0x61; 16]));
1027        let peer_authority = AuthorityId::from_uuid(Uuid::from_bytes([0x62; 16]));
1028        let effects = test_effects(authority_id);
1029        let session_id = Uuid::from_u128(0x6162);
1030        let self_role = authority_device_role(authority_id, 0);
1031        let peer_role = authority_device_role(peer_authority, 1);
1032
1033        effects
1034            .start_session(session_id, vec![self_role, peer_role])
1035            .await
1036            .expect("session starts");
1037
1038        let context_id = ContextId::new_from_entropy(hash(session_id.as_bytes()));
1039        for (message_id, replay_key, payload) in [
1040            ("msg-2", "replay-2", b"second".to_vec()),
1041            ("msg-1", "replay-1", b"first".to_vec()),
1042        ] {
1043            let mut metadata = HashMap::new();
1044            metadata.insert(
1045                "content-type".to_string(),
1046                "application/aura-choreography".to_string(),
1047            );
1048            metadata.insert("session-id".to_string(), session_id.to_string());
1049            metadata.insert("message-id".to_string(), message_id.to_string());
1050            metadata.insert("replay-key".to_string(), replay_key.to_string());
1051            effects.requeue_envelope(TransportEnvelope {
1052                destination: authority_id,
1053                source: peer_authority,
1054                context: context_id,
1055                payload,
1056                metadata,
1057                receipt: None,
1058            });
1059        }
1060
1061        let session_runtime_id = RuntimeChoreographySessionId::from_uuid(session_id);
1062        let snapshot = effects
1063            .choreography_state
1064            .read()
1065            .session_inbox_snapshot(session_runtime_id);
1066        let identities = snapshot
1067            .iter()
1068            .map(|envelope| {
1069                (
1070                    envelope
1071                        .metadata
1072                        .get("message-id")
1073                        .cloned()
1074                        .expect("message id preserved"),
1075                    envelope
1076                        .metadata
1077                        .get("replay-key")
1078                        .cloned()
1079                        .expect("replay key preserved"),
1080                    envelope.payload.clone(),
1081                )
1082            })
1083            .collect::<Vec<_>>();
1084        assert_eq!(
1085            identities,
1086            vec![
1087                ("msg-2".to_string(), "replay-2".to_string(), b"second".to_vec()),
1088                ("msg-1".to_string(), "replay-1".to_string(), b"first".to_vec()),
1089            ],
1090            "host ingress reordering may change arrival order, but communication identity must survive unchanged"
1091        );
1092
1093        for expected in [
1094            ("msg-2", "replay-2", b"second".to_vec()),
1095            ("msg-1", "replay-1", b"first".to_vec()),
1096        ] {
1097            let envelope = take_session_envelope(
1098                effects.as_ref(),
1099                session_runtime_id,
1100                peer_authority,
1101                context_id,
1102            )
1103            .expect("session envelope should be available");
1104            assert_eq!(
1105                envelope.metadata.get("message-id").map(String::as_str),
1106                Some(expected.0)
1107            );
1108            assert_eq!(
1109                envelope.metadata.get("replay-key").map(String::as_str),
1110                Some(expected.1)
1111            );
1112            assert_eq!(envelope.payload, expected.2);
1113        }
1114
1115        effects.end_session().await.expect("session ends");
1116    }
1117
1118    #[tokio::test]
1119    async fn receive_reports_timeout_without_polling_loop() {
1120        let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([17; 16]));
1121        let peer_authority = AuthorityId::from_uuid(Uuid::from_bytes([18; 16]));
1122        let effects = test_effects(authority_id);
1123        let session_id = Uuid::from_u128(36);
1124        let self_role = authority_device_role(authority_id, 0);
1125        let peer_role = authority_device_role(peer_authority, 1);
1126
1127        effects
1128            .start_session(session_id, vec![self_role, peer_role])
1129            .await
1130            .expect("session starts");
1131        effects.set_timeout(20).await;
1132
1133        let error = assert_settles_within(
1134            effects.receive_from_role_bytes(peer_role),
1135            Duration::from_millis(100),
1136            "receive should resolve with a timeout error",
1137        )
1138        .await
1139        .expect_err("receive should time out");
1140        assert!(matches!(
1141            error,
1142            ChoreographyError::Transport { source }
1143                if source
1144                    .downcast_ref::<aura_core::effects::TransportError>()
1145                    .is_some_and(|inner| matches!(inner, aura_core::effects::TransportError::NoMessage))
1146        ));
1147        assert_eq!(effects.get_metrics().await.timeout_count, 1);
1148
1149        effects.end_session().await.expect("session ends");
1150    }
1151
1152    #[tokio::test]
1153    async fn receive_returns_session_not_started_when_session_is_cancelled() {
1154        let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([15; 16]));
1155        let peer_authority = AuthorityId::from_uuid(Uuid::from_bytes([16; 16]));
1156        let effects = test_effects(authority_id);
1157        let session_id = Uuid::from_u128(37);
1158        let runtime_session_id = RuntimeChoreographySessionId::from_uuid(session_id);
1159        let self_role = authority_device_role(authority_id, 0);
1160        let peer_role = authority_device_role(peer_authority, 1);
1161
1162        effects
1163            .start_session(session_id, vec![self_role, peer_role])
1164            .await
1165            .expect("session starts");
1166
1167        let delayed_effects = Arc::clone(&effects);
1168        let mut delayed_tasks = tokio::task::JoinSet::new();
1169        delayed_tasks.spawn(async move {
1170            delayed_effects.time_handler.sleep_ms(10).await;
1171            delayed_effects
1172                .choreography_state
1173                .write()
1174                .cancel_session(runtime_session_id);
1175        });
1176
1177        let error = assert_settles_within(
1178            effects.receive_from_role_bytes(peer_role),
1179            Duration::from_millis(100),
1180            "receive should resolve when session is cancelled",
1181        )
1182        .await;
1183        delayed_tasks
1184            .join_next()
1185            .await
1186            .expect("cancel task joined")
1187            .expect("cancel task");
1188        let error = error.expect_err("receive should fail when session is cancelled");
1189        assert!(matches!(error, ChoreographyError::SessionNotStarted));
1190    }
1191}