1use super::{AuraEffectSystem, CHOREO_FLOW_COST_PER_KB, DEFAULT_CHOREO_FLOW_COST};
2use async_trait::async_trait;
3use aura_chat::capabilities::ChatCapability;
4use aura_core::effects::transport::{TransportEnvelope, TransportReceipt};
5use aura_core::effects::{PhysicalTimeEffects, TransportEffects, WakeCondition};
6use aura_core::hash::hash;
7use aura_core::{AuthorityId, ContextId, FlowCost};
8use aura_guards::prelude::create_send_guard_op;
9use aura_guards::{GuardOperation, JournalCoupler};
10use aura_protocol::effects::{
11 ChoreographicEffects, ChoreographicRole, ChoreographyError, ChoreographyEvent,
12 ChoreographyMetrics, RoleIndex,
13};
14use std::collections::HashMap;
15
16use crate::runtime::subsystems::choreography::RuntimeChoreographySessionId;
17use crate::runtime::subsystems::choreography::SessionStartError;
18
19fn current_session_snapshot(
20 effects: &AuraEffectSystem,
21) -> Result<crate::runtime::subsystems::choreography::ChoreographySessionState, ChoreographyError> {
22 effects
23 .choreography_state
24 .read()
25 .current_session()
26 .ok_or(ChoreographyError::SessionNotStarted)
27}
28
29fn take_session_envelope(
30 effects: &AuraEffectSystem,
31 session_id: RuntimeChoreographySessionId,
32 source: AuthorityId,
33 context: ContextId,
34) -> Option<TransportEnvelope> {
35 let self_device_id = effects.config.device_id.to_string();
36 effects
37 .choreography_state
38 .write()
39 .take_matching_session_envelope(
40 session_id,
41 source,
42 context,
43 effects.authority_id,
44 &self_device_id,
45 )
46}
47
48fn promote_shared_session_envelopes(
49 effects: &AuraEffectSystem,
50 session_id: RuntimeChoreographySessionId,
51) {
52 let Some(shared) = effects.transport.shared_transport() else {
53 return;
54 };
55 let session_ref = session_id.to_string();
56 let inbox = shared.inbox_for(effects.authority_id);
57 let mut inbox = inbox.write();
58 let mut promoted = Vec::new();
59
60 let mut index = 0usize;
61 while index < inbox.len() {
62 let matches_session = inbox[index]
63 .metadata
64 .get("content-type")
65 .is_some_and(|value| value == "application/aura-choreography")
66 && inbox[index]
67 .metadata
68 .get("session-id")
69 .is_some_and(|value| value == &session_ref);
70 if matches_session {
71 promoted.push(inbox.remove(index));
72 } else {
73 index += 1;
74 }
75 }
76 drop(inbox);
77
78 if promoted.is_empty() {
79 return;
80 }
81
82 let mut state = effects.choreography_state.write();
83 for envelope in promoted {
84 state.queue_session_envelope(session_id, envelope);
85 }
86}
87
88#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
90#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
91impl ChoreographicEffects for AuraEffectSystem {
92 async fn send_to_role_bytes(
93 &self,
94 role: ChoreographicRole,
95 message: Vec<u8>,
96 ) -> Result<(), ChoreographyError> {
97 let session = current_session_snapshot(self)?;
98 let context_id = session.context_id;
99 let current_role = session.current_role;
100
101 let peer = role.authority_id;
102 tracing::debug!(
103 session_id = %session.session_id,
104 from = ?current_role.device_id,
105 to = ?role.device_id,
106 peer = %peer,
107 ?context_id,
108 bytes = message.len(),
109 "choreography send"
110 );
111 let kb_units = ((message.len() as u32).saturating_add(1023)) / 1024;
112 let flow_cost = DEFAULT_CHOREO_FLOW_COST
113 .saturating_add(kb_units.saturating_mul(CHOREO_FLOW_COST_PER_KB));
114
115 let guard_chain = create_send_guard_op(
116 GuardOperation::Custom(ChatCapability::MessageSend.as_name().to_string()),
117 context_id,
118 peer,
119 FlowCost::new(flow_cost),
120 )
121 .with_operation_id(format!(
122 "choreography_send_{}_{}_{:?}",
123 session.session_id, context_id, role.device_id
124 ));
125
126 let guard_result =
127 guard_chain
128 .evaluate(self)
129 .await
130 .map_err(|e| ChoreographyError::InternalError {
131 message: format!("Choreography send guard failed: {e}"),
132 })?;
133
134 if !guard_result.authorized {
135 return Err(ChoreographyError::InternalError {
136 message: guard_result
137 .denial_reason
138 .unwrap_or_else(|| "Choreography send denied by guard chain".to_string()),
139 });
140 }
141
142 JournalCoupler::new()
143 .couple_with_send(self, &guard_result.receipt)
144 .await
145 .map_err(|e| ChoreographyError::InternalError {
146 message: format!("Choreography journal coupling failed: {e}"),
147 })?;
148
149 let transport_receipt = guard_result
150 .receipt
151 .as_ref()
152 .map(|receipt| TransportReceipt {
153 context: receipt.ctx,
154 src: receipt.src,
155 dst: receipt.dst,
156 epoch: receipt.epoch.value(),
157 cost: receipt.cost.value(),
158 nonce: receipt.nonce.value(),
159 prev: receipt.prev.0,
160 sig: receipt.sig.clone().into_bytes(),
161 });
162
163 let mut metadata = HashMap::new();
165 metadata.insert(
166 "content-type".to_string(),
167 "application/aura-choreography".to_string(),
168 );
169 metadata.insert("session-id".to_string(), session.session_id.to_string());
170 metadata.insert(
171 "aura-source-device-id".to_string(),
172 current_role.device_id.to_string(),
173 );
174 metadata.insert(
175 "aura-destination-device-id".to_string(),
176 role.device_id.to_string(),
177 );
178 if let Some(protocol_id) = session.protocol_id.as_ref() {
179 metadata.insert("protocol-id".to_string(), protocol_id.clone());
180 }
181
182 let envelope = TransportEnvelope {
183 destination: peer,
184 source: current_role.authority_id,
185 context: context_id,
186 payload: message,
187 metadata,
188 receipt: transport_receipt,
189 };
190
191 TransportEffects::send_envelope(self, envelope)
192 .await
193 .map_err(|e| ChoreographyError::Transport {
194 source: Box::new(e),
195 })?;
196
197 {
198 let mut state = self.choreography_state.write();
199 state
200 .with_current_session_mut(|session| {
201 session.metrics.messages_sent = session.metrics.messages_sent.saturating_add(1);
202 })
203 .map_err(|message| ChoreographyError::InternalError { message })?;
204 }
205 Ok(())
206 }
207
208 async fn receive_from_role_bytes(
209 &self,
210 role: ChoreographicRole,
211 ) -> Result<Vec<u8>, ChoreographyError> {
212 let session = current_session_snapshot(self)?;
213 let context_id = session.context_id;
214 let session_id = session.session_id;
215 let session_inbox_notify = self
216 .choreography_state
217 .read()
218 .session_inbox_notify(session_id);
219 let shared_inbox_notify = self
220 .transport
221 .shared_transport()
222 .map(|shared| shared.inbox_notify(self.authority_id));
223
224 let timeout_ms = session.timeout_ms.unwrap_or(5000);
227 let timeout_handle = self
228 .time_handler
229 .set_timeout(timeout_ms)
230 .await
231 .map_err(|error| ChoreographyError::InternalError {
232 message: format!("failed to issue receive timeout witness: {error}"),
233 })?;
234
235 let source_authority = role.authority_id;
236 tracing::debug!(
237 session_id = %session_id,
238 "Choreography receive: waiting for message from {:?} (authority {:?}) in context {:?}, timeout={}ms",
239 role.device_id,
240 source_authority,
241 context_id,
242 timeout_ms
243 );
244
245 let envelope = loop {
246 if let Some(env) = take_session_envelope(self, session_id, source_authority, context_id)
247 {
248 self.transport.record_receive();
249 break env;
250 }
251
252 promote_shared_session_envelopes(self, session_id);
253 if let Some(env) = take_session_envelope(self, session_id, source_authority, context_id)
254 {
255 self.transport.record_receive();
256 break env;
257 }
258
259 let Some(session_inbox_notify) = session_inbox_notify.clone() else {
260 let _ = self.time_handler.cancel_timeout(timeout_handle).await;
261 return Err(ChoreographyError::InternalError {
262 message: format!(
263 "missing choreography inbox notifier for active session {session_id}"
264 ),
265 });
266 };
267
268 tokio::select! {
269 _ = session_inbox_notify.notified() => {}
270 _ = async {
271 if let Some(shared_inbox_notify) = shared_inbox_notify.clone() {
272 shared_inbox_notify.notified().await;
273 } else {
274 std::future::pending::<()>().await;
275 }
276 } => {}
277 timeout_result = self.time_handler.yield_until(WakeCondition::TimeoutExpired {
278 timeout_id: timeout_handle,
279 }) => {
280 if let Err(error) = timeout_result {
281 return Err(ChoreographyError::InternalError {
282 message: format!("receive timeout witness failed: {error}"),
283 });
284 }
285 let mut state = self.choreography_state.write();
286 let _ = state.with_current_session_mut(|session| {
287 session.metrics.timeout_count = session.metrics.timeout_count.saturating_add(1);
288 });
289 return Err(ChoreographyError::Transport {
290 source: Box::new(aura_core::effects::TransportError::NoMessage),
291 });
292 }
293 }
294
295 if !self.choreography_state.read().is_active() {
296 let _ = self.time_handler.cancel_timeout(timeout_handle).await;
297 return Err(ChoreographyError::SessionNotStarted);
298 }
299 if self
300 .choreography_state
301 .read()
302 .current_session_id()
303 .is_some_and(|active| active != session_id)
304 {
305 let _ = self.time_handler.cancel_timeout(timeout_handle).await;
306 return Err(ChoreographyError::InternalError {
307 message: format!(
308 "choreography session binding changed while waiting for receive: {session_id}"
309 ),
310 });
311 }
312 };
313
314 let _ = self.time_handler.cancel_timeout(timeout_handle).await;
315
316 {
317 let mut state = self.choreography_state.write();
318 state
319 .with_current_session_mut(|session| {
320 session.metrics.messages_received =
321 session.metrics.messages_received.saturating_add(1);
322 })
323 .map_err(|message| ChoreographyError::InternalError { message })?;
324 }
325
326 Ok(envelope.payload)
327 }
328
329 async fn broadcast_bytes(&self, message: Vec<u8>) -> Result<(), ChoreographyError> {
330 let session = current_session_snapshot(self)?;
331 let roles = session.roles.clone();
332 let current_role = session.current_role;
333
334 for role in roles {
335 if role == current_role {
336 continue;
337 }
338 self.send_to_role_bytes(role, message.clone()).await?;
339 }
340
341 Ok(())
342 }
343
344 #[allow(clippy::disallowed_methods)]
345 fn current_role(&self) -> ChoreographicRole {
346 current_session_snapshot(self).map_or_else(
347 |_| {
348 let role_index = RoleIndex::new(0).expect("role index");
349 ChoreographicRole::with_authority(
350 self.config.device_id(),
351 self.authority_id,
352 role_index,
353 )
354 },
355 |session| session.current_role,
356 )
357 }
358
359 fn all_roles(&self) -> Vec<ChoreographicRole> {
360 current_session_snapshot(self).map_or_else(
361 |_| vec![self.current_role()],
362 |session| {
363 if session.roles.is_empty() {
364 vec![self.current_role()]
365 } else {
366 session.roles
367 }
368 },
369 )
370 }
371
372 async fn is_role_active(&self, role: ChoreographicRole) -> bool {
373 let context_id = match current_session_snapshot(self) {
374 Ok(session) => session.context_id,
375 Err(_) => return false,
376 };
377
378 TransportEffects::is_channel_established(self, context_id, role.authority_id).await
379 }
380
381 async fn start_session(
382 &self,
383 session_id: uuid::Uuid,
384 roles: Vec<ChoreographicRole>,
385 ) -> Result<(), ChoreographyError> {
386 let runtime_session_id = RuntimeChoreographySessionId::from_uuid(session_id);
387 let current_device = self.config.device_id();
388 let current_role = roles
389 .iter()
390 .find(|role| role.device_id == current_device)
391 .or_else(|| {
392 roles
393 .iter()
394 .find(|role| role.authority_id == self.authority_id)
395 })
396 .copied()
397 .ok_or_else(|| {
398 let role_index = RoleIndex::new(0).expect("role index");
399 ChoreographyError::RoleNotFound {
400 role: ChoreographicRole::with_authority(
401 current_device,
402 self.authority_id,
403 role_index,
404 ),
405 }
406 })?;
407
408 let context_id = ContextId::new_from_entropy(hash(session_id.as_bytes()));
411 tracing::debug!(
412 "Choreography start_session: session_id={}, context_id={:?}, authority={:?}, roles={:?}",
413 runtime_session_id,
414 context_id,
415 self.authority_id,
416 roles.iter().map(|r| r.device_id).collect::<Vec<_>>()
417 );
418 let started_at_ms = self
419 .physical_time()
420 .await
421 .map(|time| time.ts_ms)
422 .unwrap_or_default();
423
424 let mut state = self.choreography_state.write();
425 state
426 .start_session(
427 runtime_session_id,
428 None,
429 context_id,
430 roles,
431 current_role,
432 None,
433 started_at_ms,
434 )
435 .map_err(|error| match error {
436 SessionStartError::SessionAlreadyExists { .. } => {
437 ChoreographyError::SessionAlreadyExists { session_id }
438 }
439 SessionStartError::TaskAlreadyBound { .. } => ChoreographyError::InternalError {
440 message: error.to_string(),
441 },
442 })
443 }
444
445 async fn end_session(&self) -> Result<(), ChoreographyError> {
446 let ended_at_ms = self
447 .physical_time()
448 .await
449 .map(|time| time.ts_ms)
450 .unwrap_or_default();
451
452 let mut state = self.choreography_state.write();
453 let ended_session_id = state
454 .end_session(ended_at_ms)
455 .map_err(|_| ChoreographyError::SessionNotStarted)?;
456 drop(state);
457 let _released_fragments = self.release_vm_fragments_for_session(ended_session_id);
458 Ok(())
459 }
460
461 async fn emit_choreo_event(&self, event: ChoreographyEvent) -> Result<(), ChoreographyError> {
462 tracing::debug!(?event, "choreography event");
463 Ok(())
464 }
465
466 async fn set_timeout(&self, timeout_ms: u64) {
467 let mut state = self.choreography_state.write();
468 let _ = state.with_current_session_mut(|session| {
469 session.timeout_ms = Some(timeout_ms);
470 });
471 }
472
473 async fn get_metrics(&self) -> ChoreographyMetrics {
474 current_session_snapshot(self).map_or_else(|_| default_metrics(), |session| session.metrics)
475 }
476}
477
478fn default_metrics() -> ChoreographyMetrics {
479 ChoreographyMetrics {
480 messages_sent: 0,
481 messages_received: 0,
482 avg_latency_ms: 0.0,
483 timeout_count: 0,
484 retry_count: 0,
485 total_duration_ms: 0,
486 }
487}
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492 use crate::core::AgentConfig;
493 use aura_core::DeviceId;
494 use std::sync::Arc;
495 use std::time::Duration;
496 use tokio::sync::Barrier;
497 use uuid::Uuid;
498
499 async fn assert_settles_within<T, E: std::fmt::Debug>(
500 future: impl std::future::Future<Output = Result<T, E>>,
501 timeout: Duration,
502 message: &str,
503 ) -> Result<T, E> {
504 let time = aura_effects::time::PhysicalTimeHandler::new();
505 let started_at = time
506 .physical_time()
507 .await
508 .expect("physical time should be available");
509 let budget = aura_core::TimeoutBudget::from_start_and_timeout(&started_at, timeout)
510 .expect("timeout budget should fit");
511 match aura_core::execute_with_timeout_budget(&time, &budget, || future).await {
512 Ok(value) => Ok(value),
513 Err(aura_core::TimeoutRunError::Operation(error)) => Err(error),
514 Err(aura_core::TimeoutRunError::Timeout(error)) => {
515 panic!("{message}: timeout budget exceeded: {error:?}")
516 }
517 }
518 }
519
520 fn test_effects(authority_id: AuthorityId) -> Arc<AuraEffectSystem> {
521 let authority_bytes = authority_id.to_bytes();
522 let seed_salt = u64::from_le_bytes(authority_bytes[..8].try_into().expect("salt bytes"));
523 Arc::new(
524 AuraEffectSystem::simulation_for_test_for_authority_with_salt(
525 &AgentConfig::default(),
526 authority_id,
527 seed_salt,
528 )
529 .expect("testing effect system"),
530 )
531 }
532
533 fn authority_device_role(authority_id: AuthorityId, role_index: u16) -> ChoreographicRole {
534 ChoreographicRole::for_authority(
535 authority_id,
536 RoleIndex::new(role_index.into()).expect("role index"),
537 )
538 }
539
540 #[tokio::test]
541 async fn concurrent_sessions_are_isolated_per_task() {
542 let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([7; 16]));
543 let effects = test_effects(authority_id);
544 let barrier = Arc::new(Barrier::new(3));
545
546 let session_a = Uuid::from_u128(1);
547 let session_b = Uuid::from_u128(2);
548 let peer_a = ChoreographicRole::new(
549 DeviceId::from_uuid(Uuid::from_u128(11)),
550 AuthorityId::new_from_entropy([11u8; 32]),
551 RoleIndex::new(1).expect("role index"),
552 );
553 let peer_b = ChoreographicRole::new(
554 DeviceId::from_uuid(Uuid::from_u128(12)),
555 AuthorityId::new_from_entropy([12u8; 32]),
556 RoleIndex::new(1).expect("role index"),
557 );
558
559 let task_a_effects = Arc::clone(&effects);
560 let task_a_barrier = Arc::clone(&barrier);
561 let mut tasks = tokio::task::JoinSet::new();
562 tasks.spawn(async move {
563 task_a_effects
564 .start_session(
565 session_a,
566 vec![authority_device_role(authority_id, 0), peer_a],
567 )
568 .await
569 .expect("session a starts");
570 task_a_barrier.wait().await;
571 assert_eq!(
572 task_a_effects.current_role(),
573 authority_device_role(authority_id, 0)
574 );
575 assert_eq!(task_a_effects.all_roles().len(), 2);
576 task_a_effects.set_timeout(111).await;
577 assert_eq!(task_a_effects.get_metrics().await.messages_sent, 0);
578 task_a_effects.end_session().await.expect("session a ends");
579 });
580
581 let task_b_effects = Arc::clone(&effects);
582 let task_b_barrier = Arc::clone(&barrier);
583 tasks.spawn(async move {
584 task_b_effects
585 .start_session(
586 session_b,
587 vec![authority_device_role(authority_id, 0), peer_b],
588 )
589 .await
590 .expect("session b starts");
591 task_b_barrier.wait().await;
592 assert_eq!(
593 task_b_effects.current_role(),
594 authority_device_role(authority_id, 0)
595 );
596 assert_eq!(task_b_effects.all_roles().len(), 2);
597 task_b_effects.set_timeout(222).await;
598 assert_eq!(task_b_effects.get_metrics().await.messages_received, 0);
599 task_b_effects.end_session().await.expect("session b ends");
600 });
601
602 barrier.wait().await;
603 tasks
604 .join_next()
605 .await
606 .expect("task a joined")
607 .expect("task a");
608 tasks
609 .join_next()
610 .await
611 .expect("task b joined")
612 .expect("task b");
613 assert_eq!(effects.choreography_state.read().active_session_count(), 0);
614 }
615
616 #[tokio::test]
617 async fn concurrent_session_sends_keep_guard_and_transport_contexts_isolated() {
618 let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([13; 16]));
619 let effects = Arc::new(
620 AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
621 &AgentConfig::default(),
622 authority_id,
623 crate::runtime::SharedTransport::new(),
624 )
625 .expect("testing effect system with shared transport"),
626 );
627 let barrier = Arc::new(Barrier::new(3));
628
629 let session_a = Uuid::from_u128(41);
630 let session_b = Uuid::from_u128(42);
631 let self_role = authority_device_role(authority_id, 0);
632 let loopback_peer = authority_device_role(authority_id, 1);
633
634 let task_a_effects = Arc::clone(&effects);
635 let task_a_barrier = Arc::clone(&barrier);
636 let mut tasks = tokio::task::JoinSet::new();
637 tasks.spawn(async move {
638 task_a_effects
639 .start_session(session_a, vec![self_role, loopback_peer])
640 .await
641 .expect("session a starts");
642 task_a_barrier.wait().await;
643 task_a_effects
644 .send_to_role_bytes(loopback_peer, b"alpha".to_vec())
645 .await
646 .expect("session a send succeeds");
647 task_a_effects.end_session().await.expect("session a ends");
648 });
649
650 let task_b_effects = Arc::clone(&effects);
651 let task_b_barrier = Arc::clone(&barrier);
652 tasks.spawn(async move {
653 task_b_effects
654 .start_session(session_b, vec![self_role, loopback_peer])
655 .await
656 .expect("session b starts");
657 task_b_barrier.wait().await;
658 task_b_effects
659 .send_to_role_bytes(loopback_peer, b"beta".to_vec())
660 .await
661 .expect("session b send succeeds");
662 task_b_effects.end_session().await.expect("session b ends");
663 });
664
665 barrier.wait().await;
666 tasks
667 .join_next()
668 .await
669 .expect("first task joined")
670 .expect("first task result");
671 tasks
672 .join_next()
673 .await
674 .expect("second task joined")
675 .expect("second task result");
676 let shared = effects
677 .transport
678 .shared_transport()
679 .expect("shared transport should be attached for the test");
680 let shared_inbox = shared.inbox_for(authority_id).read().clone();
681 let session_a_envelopes = shared_inbox
682 .iter()
683 .filter(|env| {
684 env.metadata
685 .get("session-id")
686 .is_some_and(|value| value == &session_a.to_string())
687 })
688 .cloned()
689 .collect::<Vec<_>>();
690 let session_b_envelopes = shared_inbox
691 .iter()
692 .filter(|env| {
693 env.metadata
694 .get("session-id")
695 .is_some_and(|value| value == &session_b.to_string())
696 })
697 .cloned()
698 .collect::<Vec<_>>();
699 assert_eq!(
700 session_a_envelopes.len(),
701 1,
702 "session a should queue one local send"
703 );
704 assert_eq!(
705 session_b_envelopes.len(),
706 1,
707 "session b should queue one local send"
708 );
709
710 let expected = [
711 (
712 session_a.to_string(),
713 ContextId::new_from_entropy(hash(session_a.as_bytes())),
714 session_a_envelopes,
715 ),
716 (
717 session_b.to_string(),
718 ContextId::new_from_entropy(hash(session_b.as_bytes())),
719 session_b_envelopes,
720 ),
721 ];
722
723 for (session_id, context_id, envelopes) in expected {
724 let envelope = envelopes
725 .iter()
726 .find(|env| {
727 env.metadata
728 .get("session-id")
729 .is_some_and(|value| value == &session_id)
730 })
731 .expect("session envelope should be present");
732 assert_eq!(envelope.context, context_id);
733 assert_eq!(
734 envelope.receipt.as_ref().map(|receipt| receipt.context),
735 Some(context_id),
736 "guard/journal receipt context must remain session-scoped"
737 );
738 }
739 }
740
741 #[tokio::test]
742 async fn receive_filters_by_session_id_metadata() {
743 let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([9; 16]));
744 let peer_authority = AuthorityId::from_uuid(Uuid::from_bytes([10; 16]));
745 let effects = test_effects(authority_id);
746 let session_id = Uuid::from_u128(33);
747 let wrong_session_id = Uuid::from_u128(34);
748 let self_role = authority_device_role(authority_id, 0);
749 let peer_role = authority_device_role(peer_authority, 1);
750
751 effects
752 .start_session(session_id, vec![self_role, peer_role])
753 .await
754 .expect("session starts");
755
756 let context_id = ContextId::new_from_entropy(hash(session_id.as_bytes()));
757 for (sid, payload) in [
758 (wrong_session_id, b"wrong".to_vec()),
759 (session_id, b"correct".to_vec()),
760 ] {
761 let mut metadata = HashMap::new();
762 metadata.insert(
763 "content-type".to_string(),
764 "application/aura-choreography".to_string(),
765 );
766 metadata.insert("session-id".to_string(), sid.to_string());
767 effects.requeue_envelope(TransportEnvelope {
768 destination: authority_id,
769 source: peer_authority,
770 context: context_id,
771 payload,
772 metadata,
773 receipt: None,
774 });
775 }
776 {
777 let state = effects.choreography_state.read();
778 assert_eq!(
779 state.session_inbox_len(RuntimeChoreographySessionId::from_uuid(wrong_session_id)),
780 1
781 );
782 assert_eq!(
783 state.session_inbox_len(RuntimeChoreographySessionId::from_uuid(session_id)),
784 1
785 );
786 }
787
788 assert_eq!(peer_role.authority_id, peer_authority);
789 let payload = take_session_envelope(
790 effects.as_ref(),
791 RuntimeChoreographySessionId::from_uuid(session_id),
792 peer_authority,
793 context_id,
794 )
795 .expect("session-scoped envelope should be available")
796 .payload;
797 assert_eq!(payload, b"correct".to_vec());
798 {
799 let state = effects.choreography_state.read();
800 assert_eq!(
801 state.session_inbox_len(RuntimeChoreographySessionId::from_uuid(wrong_session_id)),
802 1
803 );
804 assert_eq!(
805 state.session_inbox_len(RuntimeChoreographySessionId::from_uuid(session_id)),
806 0
807 );
808 }
809
810 effects.end_session().await.expect("session ends");
811 }
812
813 #[tokio::test]
814 async fn receive_waits_on_session_local_notify() {
815 let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([11; 16]));
816 let peer_authority = AuthorityId::from_uuid(Uuid::from_bytes([12; 16]));
817 let effects = test_effects(authority_id);
818 let session_id = Uuid::from_u128(35);
819 let self_role = authority_device_role(authority_id, 0);
820 let peer_role = authority_device_role(peer_authority, 1);
821
822 effects
823 .start_session(session_id, vec![self_role, peer_role])
824 .await
825 .expect("session starts");
826
827 let context_id = ContextId::new_from_entropy(hash(session_id.as_bytes()));
828 let delayed_effects = Arc::clone(&effects);
829 let mut delayed_tasks = tokio::task::JoinSet::new();
830 delayed_tasks.spawn(async move {
831 delayed_effects.time_handler.sleep_ms(10).await;
832 let mut metadata = HashMap::new();
833 metadata.insert(
834 "content-type".to_string(),
835 "application/aura-choreography".to_string(),
836 );
837 metadata.insert("session-id".to_string(), session_id.to_string());
838 delayed_effects.requeue_envelope(TransportEnvelope {
839 destination: authority_id,
840 source: peer_authority,
841 context: context_id,
842 payload: b"notified".to_vec(),
843 metadata,
844 receipt: None,
845 });
846 });
847
848 let payload = assert_settles_within(
849 effects.receive_from_role_bytes(peer_role),
850 Duration::from_millis(40),
851 "session-local notify should wake receive before polling-sized timeout",
852 )
853 .await;
854 delayed_tasks
855 .join_next()
856 .await
857 .expect("enqueue task joined")
858 .expect("enqueue task");
859 let payload = payload.expect("session-scoped receive succeeds");
860 assert_eq!(payload, b"notified".to_vec());
861
862 effects.end_session().await.expect("session ends");
863 }
864
865 #[tokio::test]
866 async fn concurrent_inbound_delivery_remains_isolated_per_active_fragment() {
867 let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([19; 16]));
868 let effects = test_effects(authority_id);
869 let barrier = Arc::new(Barrier::new(3));
870
871 let session_a = Uuid::from_u128(38);
872 let session_b = Uuid::from_u128(39);
873 let peer_a_authority = AuthorityId::from_uuid(Uuid::from_bytes([20; 16]));
874 let peer_b_authority = AuthorityId::from_uuid(Uuid::from_bytes([21; 16]));
875 let self_role = authority_device_role(authority_id, 0);
876 let peer_a_role = authority_device_role(peer_a_authority, 1);
877 let peer_b_role = authority_device_role(peer_b_authority, 1);
878
879 let task_a_effects = Arc::clone(&effects);
880 let task_a_barrier = Arc::clone(&barrier);
881 let mut tasks = tokio::task::JoinSet::new();
882 tasks.spawn(async move {
883 task_a_effects
884 .start_session(session_a, vec![self_role, peer_a_role])
885 .await
886 .expect("session a starts");
887 task_a_barrier.wait().await;
888 let payload = task_a_effects
889 .receive_from_role_bytes(peer_a_role)
890 .await
891 .expect("session a receive succeeds");
892 task_a_effects.end_session().await.expect("session a ends");
893 payload
894 });
895
896 let task_b_effects = Arc::clone(&effects);
897 let task_b_barrier = Arc::clone(&barrier);
898 tasks.spawn(async move {
899 task_b_effects
900 .start_session(session_b, vec![self_role, peer_b_role])
901 .await
902 .expect("session b starts");
903 task_b_barrier.wait().await;
904 let payload = task_b_effects
905 .receive_from_role_bytes(peer_b_role)
906 .await
907 .expect("session b receive succeeds");
908 task_b_effects.end_session().await.expect("session b ends");
909 payload
910 });
911
912 let enqueue = async {
913 barrier.wait().await;
914 for (session_id, source, payload) in [
915 (session_a, peer_a_authority, b"alpha".to_vec()),
916 (session_b, peer_b_authority, b"beta".to_vec()),
917 ] {
918 let mut metadata = HashMap::new();
919 metadata.insert(
920 "content-type".to_string(),
921 "application/aura-choreography".to_string(),
922 );
923 metadata.insert("session-id".to_string(), session_id.to_string());
924 effects.requeue_envelope(TransportEnvelope {
925 destination: authority_id,
926 source,
927 context: ContextId::new_from_entropy(hash(session_id.as_bytes())),
928 payload,
929 metadata,
930 receipt: None,
931 });
932 }
933 };
934
935 enqueue.await;
936 let first = tasks
937 .join_next()
938 .await
939 .expect("first receive task joined")
940 .expect("first receive task");
941 let second = tasks
942 .join_next()
943 .await
944 .expect("second receive task joined")
945 .expect("second receive task");
946 assert!(matches!(
947 (first.as_slice(), second.as_slice()),
948 (b"alpha", b"beta") | (b"beta", b"alpha")
949 ));
950 assert_eq!(effects.choreography_state.read().active_session_count(), 0);
951 }
952
953 #[tokio::test]
954 async fn session_sends_include_protocol_and_device_routing_metadata() {
955 let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([0x71; 16]));
956 let effects = Arc::new(
957 AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
958 &AgentConfig::default(),
959 authority_id,
960 crate::runtime::SharedTransport::new(),
961 )
962 .expect("testing effect system with shared transport"),
963 );
964 let session_id = Uuid::from_u128(0x7172);
965 let self_role = authority_device_role(authority_id, 0);
966 let loopback_peer = authority_device_role(authority_id, 1);
967
968 effects
969 .start_session(session_id, vec![self_role, loopback_peer])
970 .await
971 .expect("session starts");
972 effects
973 .set_current_runtime_choreography_protocol_id("aura.test.protocol")
974 .expect("protocol id attaches to current session");
975 effects
976 .send_to_role_bytes(loopback_peer, b"hello".to_vec())
977 .await
978 .expect("send succeeds");
979
980 let shared = effects
981 .transport
982 .shared_transport()
983 .expect("shared transport should be attached for the test");
984 let envelope = shared
985 .inbox_for(authority_id)
986 .read()
987 .first()
988 .cloned()
989 .expect("loopback send should queue one envelope");
990 let session_id_string = session_id.to_string();
991 let source_device_string = self_role.device_id.to_string();
992 let destination_device_string = loopback_peer.device_id.to_string();
993
994 assert_eq!(
995 envelope.metadata.get("content-type").map(String::as_str),
996 Some("application/aura-choreography")
997 );
998 assert_eq!(
999 envelope.metadata.get("session-id").map(String::as_str),
1000 Some(session_id_string.as_str())
1001 );
1002 assert_eq!(
1003 envelope.metadata.get("protocol-id").map(String::as_str),
1004 Some("aura.test.protocol")
1005 );
1006 assert_eq!(
1007 envelope
1008 .metadata
1009 .get("aura-source-device-id")
1010 .map(String::as_str),
1011 Some(source_device_string.as_str())
1012 );
1013 assert_eq!(
1014 envelope
1015 .metadata
1016 .get("aura-destination-device-id")
1017 .map(String::as_str),
1018 Some(destination_device_string.as_str())
1019 );
1020
1021 effects.end_session().await.expect("session ends");
1022 }
1023
1024 #[tokio::test]
1025 async fn async_ingress_reordering_preserves_communication_identity() {
1026 let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([0x61; 16]));
1027 let peer_authority = AuthorityId::from_uuid(Uuid::from_bytes([0x62; 16]));
1028 let effects = test_effects(authority_id);
1029 let session_id = Uuid::from_u128(0x6162);
1030 let self_role = authority_device_role(authority_id, 0);
1031 let peer_role = authority_device_role(peer_authority, 1);
1032
1033 effects
1034 .start_session(session_id, vec![self_role, peer_role])
1035 .await
1036 .expect("session starts");
1037
1038 let context_id = ContextId::new_from_entropy(hash(session_id.as_bytes()));
1039 for (message_id, replay_key, payload) in [
1040 ("msg-2", "replay-2", b"second".to_vec()),
1041 ("msg-1", "replay-1", b"first".to_vec()),
1042 ] {
1043 let mut metadata = HashMap::new();
1044 metadata.insert(
1045 "content-type".to_string(),
1046 "application/aura-choreography".to_string(),
1047 );
1048 metadata.insert("session-id".to_string(), session_id.to_string());
1049 metadata.insert("message-id".to_string(), message_id.to_string());
1050 metadata.insert("replay-key".to_string(), replay_key.to_string());
1051 effects.requeue_envelope(TransportEnvelope {
1052 destination: authority_id,
1053 source: peer_authority,
1054 context: context_id,
1055 payload,
1056 metadata,
1057 receipt: None,
1058 });
1059 }
1060
1061 let session_runtime_id = RuntimeChoreographySessionId::from_uuid(session_id);
1062 let snapshot = effects
1063 .choreography_state
1064 .read()
1065 .session_inbox_snapshot(session_runtime_id);
1066 let identities = snapshot
1067 .iter()
1068 .map(|envelope| {
1069 (
1070 envelope
1071 .metadata
1072 .get("message-id")
1073 .cloned()
1074 .expect("message id preserved"),
1075 envelope
1076 .metadata
1077 .get("replay-key")
1078 .cloned()
1079 .expect("replay key preserved"),
1080 envelope.payload.clone(),
1081 )
1082 })
1083 .collect::<Vec<_>>();
1084 assert_eq!(
1085 identities,
1086 vec![
1087 ("msg-2".to_string(), "replay-2".to_string(), b"second".to_vec()),
1088 ("msg-1".to_string(), "replay-1".to_string(), b"first".to_vec()),
1089 ],
1090 "host ingress reordering may change arrival order, but communication identity must survive unchanged"
1091 );
1092
1093 for expected in [
1094 ("msg-2", "replay-2", b"second".to_vec()),
1095 ("msg-1", "replay-1", b"first".to_vec()),
1096 ] {
1097 let envelope = take_session_envelope(
1098 effects.as_ref(),
1099 session_runtime_id,
1100 peer_authority,
1101 context_id,
1102 )
1103 .expect("session envelope should be available");
1104 assert_eq!(
1105 envelope.metadata.get("message-id").map(String::as_str),
1106 Some(expected.0)
1107 );
1108 assert_eq!(
1109 envelope.metadata.get("replay-key").map(String::as_str),
1110 Some(expected.1)
1111 );
1112 assert_eq!(envelope.payload, expected.2);
1113 }
1114
1115 effects.end_session().await.expect("session ends");
1116 }
1117
1118 #[tokio::test]
1119 async fn receive_reports_timeout_without_polling_loop() {
1120 let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([17; 16]));
1121 let peer_authority = AuthorityId::from_uuid(Uuid::from_bytes([18; 16]));
1122 let effects = test_effects(authority_id);
1123 let session_id = Uuid::from_u128(36);
1124 let self_role = authority_device_role(authority_id, 0);
1125 let peer_role = authority_device_role(peer_authority, 1);
1126
1127 effects
1128 .start_session(session_id, vec![self_role, peer_role])
1129 .await
1130 .expect("session starts");
1131 effects.set_timeout(20).await;
1132
1133 let error = assert_settles_within(
1134 effects.receive_from_role_bytes(peer_role),
1135 Duration::from_millis(100),
1136 "receive should resolve with a timeout error",
1137 )
1138 .await
1139 .expect_err("receive should time out");
1140 assert!(matches!(
1141 error,
1142 ChoreographyError::Transport { source }
1143 if source
1144 .downcast_ref::<aura_core::effects::TransportError>()
1145 .is_some_and(|inner| matches!(inner, aura_core::effects::TransportError::NoMessage))
1146 ));
1147 assert_eq!(effects.get_metrics().await.timeout_count, 1);
1148
1149 effects.end_session().await.expect("session ends");
1150 }
1151
1152 #[tokio::test]
1153 async fn receive_returns_session_not_started_when_session_is_cancelled() {
1154 let authority_id = AuthorityId::from_uuid(Uuid::from_bytes([15; 16]));
1155 let peer_authority = AuthorityId::from_uuid(Uuid::from_bytes([16; 16]));
1156 let effects = test_effects(authority_id);
1157 let session_id = Uuid::from_u128(37);
1158 let runtime_session_id = RuntimeChoreographySessionId::from_uuid(session_id);
1159 let self_role = authority_device_role(authority_id, 0);
1160 let peer_role = authority_device_role(peer_authority, 1);
1161
1162 effects
1163 .start_session(session_id, vec![self_role, peer_role])
1164 .await
1165 .expect("session starts");
1166
1167 let delayed_effects = Arc::clone(&effects);
1168 let mut delayed_tasks = tokio::task::JoinSet::new();
1169 delayed_tasks.spawn(async move {
1170 delayed_effects.time_handler.sleep_ms(10).await;
1171 delayed_effects
1172 .choreography_state
1173 .write()
1174 .cancel_session(runtime_session_id);
1175 });
1176
1177 let error = assert_settles_within(
1178 effects.receive_from_role_bytes(peer_role),
1179 Duration::from_millis(100),
1180 "receive should resolve when session is cancelled",
1181 )
1182 .await;
1183 delayed_tasks
1184 .join_next()
1185 .await
1186 .expect("cancel task joined")
1187 .expect("cancel task");
1188 let error = error.expect_err("receive should fail when session is cancelled");
1189 assert!(matches!(error, ChoreographyError::SessionNotStarted));
1190 }
1191}