1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::{
3 atomic::{AtomicU64, Ordering},
4 Mutex,
5};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use crate::{
9 events::{
10 MeshPrivateControlPayload, MeshPrivateReceiptPayload, MeshQueryPayload, MeshReplyPayload,
11 MeshTopicPayload, MESH_PRIVATE_CONTROL, MESH_PRIVATE_RECEIPT, MESH_QUERY, MESH_REPLY,
12 MESH_TOPIC,
13 },
14 executor_contract::{
15 MeshControlEnvelope, MeshObjectPolicy, MeshObjectReadReason, MeshObjectResult,
16 MeshPrivateObjectRef, MeshStatusResult,
17 },
18 HostPushEvent,
19};
20
21pub fn generate_session_nonce() -> String {
35 use std::sync::atomic::{AtomicU64, Ordering};
36 static FALLBACK_CTR: AtomicU64 = AtomicU64::new(0);
37
38 let mut bytes = [0u8; 16];
39 if getrandom::getrandom(&mut bytes).is_err() {
40 log::warn!("getrandom failed — using fallback nonce (not cryptographically secure)");
41 let t = SystemTime::now()
42 .duration_since(UNIX_EPOCH)
43 .unwrap_or_default()
44 .as_nanos()
45 .to_le_bytes();
46 let ctr = FALLBACK_CTR.fetch_add(1, Ordering::Relaxed).to_le_bytes();
47 bytes[..8].copy_from_slice(&t[..8]);
48 bytes[8..].copy_from_slice(&ctr);
49 }
50 bytes.iter().map(|b| format!("{b:02x}")).collect()
51}
52
53pub trait MeshRuntime: Send + Sync + 'static {
54 fn publish(&self, topic: &str, data_base64: &str) -> bool;
55 fn subscribe(&self, topic: &str) -> bool;
56 fn put_object(&self, path: &str, data_base64: &str, policy: MeshObjectPolicy) -> bool;
57 fn get_object_result(&self, path: &str) -> MeshObjectResult;
58 fn get_object(&self, path: &str) -> Option<String> {
59 self.get_object_result(path).compat_data()
60 }
61 fn request(&self, path: &str, data_base64: &str) -> String;
62 fn respond(&self, request_id: &str, data_base64: &str) -> bool;
63 fn status(&self) -> MeshStatusResult;
64 fn supports_private(&self) -> bool {
65 false
66 }
67 fn put_private_object(
68 &self,
69 _data_base64: &str,
70 _policy: MeshObjectPolicy,
71 ) -> MeshPrivateObjectRef {
72 MeshPrivateObjectRef {
73 handle: String::new(),
74 capability: String::new(),
75 expires_at_ms: None,
76 }
77 }
78 fn get_private_object_result(&self, _handle: &str, _capability: &str) -> MeshObjectResult {
79 MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
80 }
81 fn request_private(&self, _handle: &str, _capability: &str, _data_base64: &str) -> String {
82 String::new()
83 }
84 fn publish_private_control(&self, _capability: &str, _envelope: MeshControlEnvelope) -> bool {
85 false
86 }
87 fn subscribe_private_control(&self, _capability: &str) -> bool {
88 false
89 }
90 fn publish_private_receipt(&self, _capability: &str, _data_base64: &str) -> bool {
91 false
92 }
93 fn subscribe_private_receipt(&self, _capability: &str) -> bool {
94 false
95 }
96 fn drain_events(&self) -> Vec<HostPushEvent> {
97 Vec::new()
98 }
99}
100
101#[derive(Debug, Clone)]
120#[non_exhaustive]
121pub struct InMemoryMeshConfig {
122 pub emit_loopback_events: bool,
126 pub author_tag: Option<String>,
128 pub transport_name: String,
130}
131
132impl Default for InMemoryMeshConfig {
133 fn default() -> Self {
134 Self {
135 emit_loopback_events: false,
136 author_tag: None,
137 transport_name: "in-memory".into(),
138 }
139 }
140}
141
142impl InMemoryMeshConfig {
143 pub fn loopback() -> Self {
146 Self {
147 emit_loopback_events: true,
148 author_tag: Some("loopback".into()),
149 transport_name: "loopback".into(),
150 }
151 }
152}
153
154#[derive(Debug)]
155pub struct InMemoryMeshRuntime {
156 config: InMemoryMeshConfig,
157 session_nonce: String,
158 next_request_id: AtomicU64,
159 next_private_id: AtomicU64,
160 state: Mutex<InMemoryMeshState>,
161}
162
163#[derive(Debug, Default)]
164struct InMemoryMeshState {
165 subscriptions: HashSet<String>,
166 objects: HashMap<String, StoredMeshObject>,
167 private_objects: HashMap<String, StoredPrivateMeshObject>,
168 private_control_subscriptions: HashSet<String>,
169 private_receipt_subscriptions: HashSet<String>,
170 pending_queries: HashSet<String>,
171 events: VecDeque<HostPushEvent>,
172}
173
174pub const MESH_QUERY_CHANNEL: &str = "mesh/query";
177pub const MESH_REPLY_CHANNEL: &str = "mesh/reply";
178pub const MESH_PRIVATE_QUERY_CHANNEL: &str = "mesh/private/query";
179pub const MESH_PRIVATE_CONTROL_CHANNEL: &str = "mesh/private/control";
180pub const MESH_PRIVATE_RECEIPT_CHANNEL: &str = "mesh/private/receipt";
181
182#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
183#[serde(rename_all = "camelCase")]
184pub struct MeshWireQuery {
185 pub request_id: String,
186 pub path: String,
187 pub data_base64: String,
188}
189
190#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
191#[serde(rename_all = "camelCase")]
192pub struct MeshPrivateWireQuery {
193 pub request_id: String,
194 pub handle: String,
195 pub capability: String,
196 pub data_base64: String,
197}
198
199#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
200#[serde(rename_all = "camelCase")]
201pub struct MeshWireReply {
202 pub request_id: String,
203 pub data_base64: Option<String>,
204 #[serde(skip_serializing_if = "Option::is_none")]
205 pub reason: Option<MeshObjectReadReason>,
206 #[serde(skip_serializing_if = "Option::is_none")]
207 pub expires_at_ms: Option<u64>,
208}
209
210#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
211#[serde(rename_all = "camelCase")]
212pub struct MeshPrivateControlWireMessage {
213 pub capability: String,
214 pub envelope: MeshControlEnvelope,
215}
216
217#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
218#[serde(rename_all = "camelCase")]
219pub struct MeshPrivateReceiptWireMessage {
220 pub capability: String,
221 pub data_base64: String,
222}
223
224#[derive(Debug, Clone)]
225pub struct StoredMeshObject {
226 pub data_base64: String,
227 pub expires_at_ms: Option<u64>,
228 pub suppress_previews: bool,
229}
230
231#[derive(Debug, Clone)]
232pub struct StoredPrivateMeshObject {
233 pub capability: String,
234 pub object: StoredMeshObject,
235}
236
237impl StoredMeshObject {
238 pub fn from_put(data_base64: &str, policy: MeshObjectPolicy) -> Self {
239 Self {
240 data_base64: data_base64.to_string(),
241 expires_at_ms: policy.expires_at_ms,
242 suppress_previews: policy.suppress_previews.unwrap_or(false),
243 }
244 }
245
246 pub fn as_result(&self, now_ms: u64) -> MeshObjectResult {
247 let _preview_suppressed = self.suppress_previews;
251 if self
252 .expires_at_ms
253 .is_some_and(|expires_at_ms| expires_at_ms <= now_ms)
254 {
255 return MeshObjectResult::unavailable(
256 MeshObjectReadReason::Expired,
257 self.expires_at_ms,
258 );
259 }
260
261 MeshObjectResult::found(self.data_base64.clone(), self.expires_at_ms)
262 }
263}
264
265pub fn private_object_result(
266 entry: Option<&StoredPrivateMeshObject>,
267 capability: &str,
268 now_ms: u64,
269) -> MeshObjectResult {
270 let Some(entry) = entry else {
271 return MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None);
272 };
273 if entry.capability != capability {
274 return MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None);
275 }
276 entry.object.as_result(now_ms)
277}
278
279pub fn current_time_ms() -> u64 {
280 SystemTime::now()
281 .duration_since(UNIX_EPOCH)
282 .unwrap_or_default()
283 .as_millis()
284 .min(u128::from(u64::MAX)) as u64
285}
286
287impl Default for InMemoryMeshRuntime {
288 fn default() -> Self {
289 Self::new()
290 }
291}
292
293impl InMemoryMeshRuntime {
294 pub fn new() -> Self {
295 Self::with_config(InMemoryMeshConfig::default())
296 }
297
298 pub fn with_config(config: InMemoryMeshConfig) -> Self {
299 Self {
300 config,
301 session_nonce: generate_session_nonce(),
302 next_request_id: AtomicU64::new(1),
303 next_private_id: AtomicU64::new(1),
304 state: Mutex::new(InMemoryMeshState::default()),
305 }
306 }
307
308 pub fn session_nonce(&self) -> &str {
310 &self.session_nonce
311 }
312
313 pub fn insert_pending_query(&self, request_id: &str) {
318 self.state
319 .lock()
320 .unwrap_or_else(|e| e.into_inner())
321 .pending_queries
322 .insert(request_id.to_string());
323 }
324
325 pub fn has_suppress_previews(&self, path: &str) -> bool {
328 self.state
329 .lock()
330 .unwrap_or_else(|e| e.into_inner())
331 .objects
332 .get(path)
333 .map(|o| o.suppress_previews)
334 .unwrap_or(false)
335 }
336}
337
338impl MeshRuntime for InMemoryMeshRuntime {
339 fn publish(&self, topic: &str, data_base64: &str) -> bool {
340 if self.config.emit_loopback_events {
341 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
342 if state.subscriptions.contains(topic) {
343 let payload = MeshTopicPayload {
344 topic: topic.to_string(),
345 data_base64: data_base64.to_string(),
346 author: self.config.author_tag.clone(),
347 };
348 if let Ok(payload_json) = serde_json::to_string(&payload) {
349 state.events.push_back(HostPushEvent {
350 event: MESH_TOPIC.to_string(),
351 payload_json,
352 });
353 }
354 }
355 }
356 true
357 }
358
359 fn subscribe(&self, topic: &str) -> bool {
360 self.state
361 .lock()
362 .unwrap_or_else(|e| e.into_inner())
363 .subscriptions
364 .insert(topic.to_string());
365 true
366 }
367
368 fn put_object(&self, path: &str, data_base64: &str, policy: MeshObjectPolicy) -> bool {
369 self.state
370 .lock()
371 .unwrap_or_else(|e| e.into_inner())
372 .objects
373 .insert(
374 path.to_string(),
375 StoredMeshObject::from_put(data_base64, policy),
376 );
377 true
378 }
379
380 fn get_object_result(&self, path: &str) -> MeshObjectResult {
381 self.state
382 .lock()
383 .unwrap_or_else(|e| e.into_inner())
384 .objects
385 .get(path)
386 .map(|object| object.as_result(current_time_ms()))
387 .unwrap_or_else(|| MeshObjectResult::unavailable(MeshObjectReadReason::NotFound, None))
388 }
389
390 fn request(&self, path: &str, data_base64: &str) -> String {
391 let request_id = format!(
392 "mesh-req-{}-{}",
393 self.session_nonce,
394 self.next_request_id.fetch_add(1, Ordering::Relaxed)
395 );
396 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
397 state.pending_queries.insert(request_id.clone());
398
399 if self.config.emit_loopback_events {
401 let query_payload = MeshQueryPayload {
402 request_id: request_id.clone(),
403 path: path.to_string(),
404 data_base64: data_base64.to_string(),
405 author: self.config.author_tag.clone(),
406 };
407 if let Ok(payload_json) = serde_json::to_string(&query_payload) {
408 state.events.push_back(HostPushEvent {
409 event: MESH_QUERY.to_string(),
410 payload_json,
411 });
412 }
413 }
414
415 let result = state
418 .objects
419 .get(path)
420 .map(|object| object.as_result(current_time_ms()))
421 .unwrap_or_else(|| MeshObjectResult::unavailable(MeshObjectReadReason::NotFound, None));
422 state.pending_queries.remove(&request_id);
423 let reply_payload = MeshReplyPayload {
424 request_id: request_id.clone(),
425 data_base64: result.data_base64,
426 reason: result.reason,
427 expires_at_ms: result.expires_at_ms,
428 author: self.config.author_tag.clone(),
429 };
430 if let Ok(payload_json) = serde_json::to_string(&reply_payload) {
431 state.events.push_back(HostPushEvent {
432 event: MESH_REPLY.to_string(),
433 payload_json,
434 });
435 }
436
437 request_id
438 }
439
440 fn respond(&self, request_id: &str, data_base64: &str) -> bool {
447 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
448 if !state.pending_queries.remove(request_id) {
449 return false;
450 }
451 let payload = MeshReplyPayload {
452 request_id: request_id.to_string(),
453 data_base64: Some(data_base64.to_string()),
454 reason: None,
455 expires_at_ms: None,
456 author: self.config.author_tag.clone(),
457 };
458 if let Ok(payload_json) = serde_json::to_string(&payload) {
459 state.events.push_back(HostPushEvent {
460 event: MESH_REPLY.to_string(),
461 payload_json,
462 });
463 }
464 true
465 }
466
467 fn status(&self) -> MeshStatusResult {
468 let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
469 MeshStatusResult {
470 health: "healthy".to_string(),
471 transport: self.config.transport_name.clone(),
472 pending_publishes: 0,
473 pending_queries: state.pending_queries.len() as u64,
474 last_error: None,
475 }
476 }
477
478 fn supports_private(&self) -> bool {
479 true
480 }
481
482 fn put_private_object(
483 &self,
484 data_base64: &str,
485 policy: MeshObjectPolicy,
486 ) -> MeshPrivateObjectRef {
487 let id = self.next_private_id.fetch_add(1, Ordering::Relaxed);
488 let handle = format!("mesh-private-handle-{id}");
489 let capability = format!("mesh-private-capability-{id}");
490 let expires_at_ms = policy.expires_at_ms;
491 self.state
492 .lock()
493 .unwrap_or_else(|e| e.into_inner())
494 .private_objects
495 .insert(
496 handle.clone(),
497 StoredPrivateMeshObject {
498 capability: capability.clone(),
499 object: StoredMeshObject::from_put(data_base64, policy),
500 },
501 );
502 MeshPrivateObjectRef {
503 handle,
504 capability,
505 expires_at_ms,
506 }
507 }
508
509 fn get_private_object_result(&self, handle: &str, capability: &str) -> MeshObjectResult {
510 let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
511 private_object_result(
512 state.private_objects.get(handle),
513 capability,
514 current_time_ms(),
515 )
516 }
517
518 fn request_private(&self, handle: &str, capability: &str, _data_base64: &str) -> String {
519 let request_id = format!(
520 "mesh-private-req-{}-{}",
521 self.session_nonce,
522 self.next_private_id.fetch_add(1, Ordering::Relaxed)
523 );
524 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
525 state.pending_queries.insert(request_id.clone());
526 let result = private_object_result(
527 state.private_objects.get(handle),
528 capability,
529 current_time_ms(),
530 );
531 state.pending_queries.remove(&request_id);
532 let payload = MeshReplyPayload {
533 request_id: request_id.clone(),
534 data_base64: result.data_base64,
535 reason: result.reason,
536 expires_at_ms: result.expires_at_ms,
537 author: self.config.author_tag.clone(),
538 };
539 if let Ok(payload_json) = serde_json::to_string(&payload) {
540 state.events.push_back(HostPushEvent {
541 event: MESH_REPLY.to_string(),
542 payload_json,
543 });
544 }
545 request_id
546 }
547
548 fn publish_private_control(&self, capability: &str, envelope: MeshControlEnvelope) -> bool {
549 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
550 if state.private_control_subscriptions.contains(capability) {
551 let payload = MeshPrivateControlPayload {
552 capability: capability.to_string(),
553 envelope,
554 author: self.config.author_tag.clone(),
555 };
556 if let Ok(payload_json) = serde_json::to_string(&payload) {
557 state.events.push_back(HostPushEvent {
558 event: MESH_PRIVATE_CONTROL.to_string(),
559 payload_json,
560 });
561 }
562 }
563 true
564 }
565
566 fn subscribe_private_control(&self, capability: &str) -> bool {
567 self.state
568 .lock()
569 .unwrap_or_else(|e| e.into_inner())
570 .private_control_subscriptions
571 .insert(capability.to_string());
572 true
573 }
574
575 fn publish_private_receipt(&self, capability: &str, data_base64: &str) -> bool {
576 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
577 if state.private_receipt_subscriptions.contains(capability) {
578 let payload = MeshPrivateReceiptPayload {
579 capability: capability.to_string(),
580 data_base64: data_base64.to_string(),
581 author: self.config.author_tag.clone(),
582 };
583 if let Ok(payload_json) = serde_json::to_string(&payload) {
584 state.events.push_back(HostPushEvent {
585 event: MESH_PRIVATE_RECEIPT.to_string(),
586 payload_json,
587 });
588 }
589 }
590 true
591 }
592
593 fn subscribe_private_receipt(&self, capability: &str) -> bool {
594 self.state
595 .lock()
596 .unwrap_or_else(|e| e.into_inner())
597 .private_receipt_subscriptions
598 .insert(capability.to_string());
599 true
600 }
601
602 fn drain_events(&self) -> Vec<HostPushEvent> {
603 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
604 state.events.drain(..).collect()
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611 use crate::executor_contract::MeshControlMode;
612
613 #[test]
614 fn in_memory_runtime_publish_without_subscription_emits_no_events() {
615 let runtime = InMemoryMeshRuntime::new();
616 assert!(runtime.publish_private_control(
618 "unsubscribed-capability",
619 MeshControlEnvelope {
620 mode: MeshControlMode::Encrypted,
621 data_base64: "AQID".into(),
622 },
623 ));
624 assert!(runtime.publish_private_receipt("unsubscribed-capability", "AQID"));
625 assert!(runtime.drain_events().is_empty());
629 }
630
631 #[test]
632 fn in_memory_runtime_returns_structured_object_results() {
633 let runtime = InMemoryMeshRuntime::new();
634 assert!(runtime.put_object(
635 "mesh/object/1",
636 "AQID",
637 MeshObjectPolicy {
638 expires_at_ms: Some(u64::MAX),
639 suppress_previews: Some(true),
640 },
641 ));
642
643 assert_eq!(
644 runtime.get_object_result("mesh/object/1"),
645 MeshObjectResult::found("AQID".into(), Some(u64::MAX))
646 );
647 assert_eq!(runtime.get_object("mesh/object/1").as_deref(), Some("AQID"));
648 assert!(runtime.has_suppress_previews("mesh/object/1"));
649 }
650
651 #[test]
652 fn in_memory_runtime_denies_expired_object_reads() {
653 let runtime = InMemoryMeshRuntime::new();
654 assert!(runtime.put_object(
655 "mesh/object/expired",
656 "AQID",
657 MeshObjectPolicy {
658 expires_at_ms: Some(1),
659 suppress_previews: None,
660 },
661 ));
662
663 assert_eq!(
664 runtime.get_object_result("mesh/object/expired"),
665 MeshObjectResult::unavailable(MeshObjectReadReason::Expired, Some(1))
666 );
667 assert_eq!(runtime.get_object("mesh/object/expired"), None);
668 }
669
670 #[test]
671 fn in_memory_runtime_reports_missing_object() {
672 let runtime = InMemoryMeshRuntime::new();
673
674 assert_eq!(
675 runtime.get_object_result("mesh/object/missing"),
676 MeshObjectResult::unavailable(MeshObjectReadReason::NotFound, None)
677 );
678 assert_eq!(runtime.get_object("mesh/object/missing"), None);
679 }
680
681 #[test]
682 fn test_respond_with_unknown_request_id_returns_false() {
683 let runtime = InMemoryMeshRuntime::new();
684 assert!(
685 !runtime.respond("mesh-req-nonexistent-99", "AQID"),
686 "respond with unknown request_id must return false"
687 );
688 }
689
690 #[test]
691 fn in_memory_runtime_supports_private_object_reads_and_repairs() {
692 let runtime = InMemoryMeshRuntime::new();
693 let reference = runtime.put_private_object(
694 "AQID",
695 MeshObjectPolicy {
696 expires_at_ms: Some(u64::MAX),
697 suppress_previews: Some(true),
698 },
699 );
700
701 assert_eq!(
702 runtime.get_private_object_result(&reference.handle, &reference.capability),
703 MeshObjectResult::found("AQID".into(), Some(u64::MAX))
704 );
705
706 let request_id = runtime.request_private(&reference.handle, &reference.capability, "");
707 let events = runtime.drain_events();
708 assert_eq!(events.len(), 1);
709 assert_eq!(events[0].event, "meshReply");
710 assert_eq!(
711 events[0].payload_json,
712 format!(
713 r#"{{"requestId":"{request_id}","dataBase64":"AQID","expiresAtMs":18446744073709551615}}"#
714 )
715 );
716 }
717
718 #[test]
719 fn test_two_in_memory_runtimes_generate_non_colliding_request_ids() {
720 let runtime_a = InMemoryMeshRuntime::new();
721 let runtime_b = InMemoryMeshRuntime::new();
722
723 let id_a = runtime_a.request("mesh/object/1", "");
724 let id_b = runtime_b.request("mesh/object/1", "");
725
726 assert_ne!(id_a, id_b, "different runtimes at same counter must differ");
727
728 let nonce_a = id_a
729 .split('-')
730 .nth(2)
731 .expect("nonce segment must be present");
732 let nonce_b = id_b
733 .split('-')
734 .nth(2)
735 .expect("nonce segment must be present");
736 assert_ne!(nonce_a, nonce_b, "session nonces must differ");
737 }
738
739 #[test]
740 fn in_memory_runtime_denies_guessed_private_access() {
741 let runtime = InMemoryMeshRuntime::new();
742 let reference = runtime.put_private_object("AQID", MeshObjectPolicy::default());
743
744 assert_eq!(
745 runtime.get_private_object_result(&reference.handle, "wrong-capability"),
746 MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
747 );
748 assert_eq!(
749 runtime.get_private_object_result("guessed-handle", "guessed-capability"),
750 MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
751 );
752 }
753
754 #[test]
755 fn in_memory_runtime_expires_private_capabilities() {
756 let runtime = InMemoryMeshRuntime::new();
757 let reference = runtime.put_private_object(
758 "AQID",
759 MeshObjectPolicy {
760 expires_at_ms: Some(1),
761 suppress_previews: None,
762 },
763 );
764
765 assert_eq!(
766 runtime.get_private_object_result(&reference.handle, &reference.capability),
767 MeshObjectResult::unavailable(MeshObjectReadReason::Expired, Some(1))
768 );
769
770 let request_id = runtime.request_private(&reference.handle, &reference.capability, "");
771 let events = runtime.drain_events();
772 assert_eq!(
773 events[0].payload_json,
774 format!(
775 r#"{{"requestId":"{request_id}","dataBase64":null,"reason":"expired","expiresAtMs":1}}"#
776 )
777 );
778 }
779
780 #[test]
783 fn in_memory_runtime_emits_mesh_reply_on_cache_hit() {
784 let runtime = InMemoryMeshRuntime::new();
785 assert!(runtime.put_object(
786 "mesh/object/1",
787 "BAUG",
788 MeshObjectPolicy {
789 expires_at_ms: Some(u64::MAX),
790 suppress_previews: None,
791 },
792 ));
793
794 let request_id = runtime.request("mesh/object/1", "AQID");
795 let events = runtime.drain_events();
796 assert_eq!(events.len(), 1, "cache hit must emit exactly one meshReply");
797 assert_eq!(events[0].event, "meshReply");
798 assert_eq!(
799 events[0].payload_json,
800 format!(
801 r#"{{"requestId":"{request_id}","dataBase64":"BAUG","expiresAtMs":18446744073709551615}}"#
802 )
803 );
804 assert_eq!(
805 runtime.status().pending_queries,
806 0,
807 "pending must be cleared after cache-hit auto-resolve"
808 );
809 }
810
811 #[test]
812 fn in_memory_runtime_emits_mesh_reply_for_not_found() {
813 let runtime = InMemoryMeshRuntime::new();
814 let request_id = runtime.request("mesh/object/missing", "AQID");
815 let events = runtime.drain_events();
816 assert_eq!(events.len(), 1, "must emit meshReply even for not-found");
817 assert_eq!(events[0].event, "meshReply");
818 assert_eq!(
819 events[0].payload_json,
820 format!(r#"{{"requestId":"{request_id}","dataBase64":null,"reason":"not_found"}}"#)
821 );
822 }
823
824 #[test]
827 fn in_memory_runtime_loopback_config_emits_topic_on_subscribed_publish() {
828 let runtime = InMemoryMeshRuntime::with_config(InMemoryMeshConfig::loopback());
829 assert!(runtime.subscribe("room/1"));
830 assert!(runtime.publish("room/1", "AQID"));
831
832 let events = runtime.drain_events();
833 assert_eq!(events.len(), 1);
834 assert_eq!(events[0].event, "meshTopic");
835 assert!(events[0].payload_json.contains("\"author\":\"loopback\""));
836 }
837
838 #[test]
839 fn in_memory_runtime_loopback_config_emits_query_and_reply_on_request() {
840 let runtime = InMemoryMeshRuntime::with_config(InMemoryMeshConfig::loopback());
841 let request_id = runtime.request("mesh/object/missing", "AQID");
842 let events = runtime.drain_events();
843 assert_eq!(
844 events.len(),
845 2,
846 "loopback must emit meshQuery then meshReply"
847 );
848 assert_eq!(events[0].event, "meshQuery");
849 assert_eq!(events[1].event, "meshReply");
850 assert_eq!(
851 events[0].payload_json,
852 format!(
853 r#"{{"requestId":"{request_id}","path":"mesh/object/missing","dataBase64":"AQID","author":"loopback"}}"#
854 )
855 );
856 }
857
858 #[test]
859 fn in_memory_runtime_loopback_config_emits_reply_on_respond() {
860 let runtime = InMemoryMeshRuntime::with_config(InMemoryMeshConfig::loopback());
861 let external_id = format!("external-req-{}", runtime.session_nonce());
863 runtime.insert_pending_query(&external_id);
864 let _ = runtime.drain_events(); assert!(runtime.respond(&external_id, "BAUG"));
867 let events = runtime.drain_events();
868 assert_eq!(
869 events.len(),
870 1,
871 "respond in loopback mode must emit meshReply"
872 );
873 assert_eq!(events[0].event, "meshReply");
874 }
875
876 #[test]
877 fn in_memory_runtime_loopback_status_reports_loopback_transport() {
878 let runtime = InMemoryMeshRuntime::with_config(InMemoryMeshConfig::loopback());
879 assert_eq!(runtime.status().transport, "loopback");
880 }
881
882 #[test]
883 fn in_memory_runtime_default_status_reports_in_memory_transport() {
884 let runtime = InMemoryMeshRuntime::new();
885 assert_eq!(runtime.status().transport, "in-memory");
886 }
887
888 #[test]
891 fn in_memory_runtime_request_auto_resolves_so_respond_returns_false() {
892 let runtime = InMemoryMeshRuntime::new();
893 assert!(runtime.put_object(
894 "mesh/object/1",
895 "BAUG",
896 MeshObjectPolicy {
897 expires_at_ms: Some(u64::MAX),
898 suppress_previews: None,
899 },
900 ));
901
902 let request_id = runtime.request("mesh/object/1", "AQID");
903 assert!(
905 !runtime.respond(&request_id, "BAUG"),
906 "respond after auto-resolved request must return false"
907 );
908 let events = runtime.drain_events();
910 assert_eq!(events.len(), 1, "only the auto-reply from request()");
911 assert_eq!(events[0].event, "meshReply");
912 }
913}