Skip to main content

host_extensions/first_party/
mesh_runtime.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::{
3    atomic::{AtomicU64, Ordering},
4    Mutex,
5};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use crate::{
9    events::{
10        MeshPrivateControlPayload, MeshPrivateReceiptPayload, MeshQueryPayload, MeshReplyPayload,
11        MeshTopicPayload, MESH_PRIVATE_CONTROL, MESH_PRIVATE_RECEIPT, MESH_QUERY, MESH_REPLY,
12        MESH_TOPIC,
13    },
14    executor_contract::{
15        MeshControlEnvelope, MeshObjectPolicy, MeshObjectReadReason, MeshObjectResult,
16        MeshPrivateObjectRef, MeshStatusResult,
17    },
18    HostPushEvent,
19};
20
21/// Generate a random 32-character hex nonce for session-unique request IDs.
22///
23/// 16 bytes (128 bits of entropy) keeps cross-run request ID collisions
24/// negligible even over very large numbers of sequential restarts.
25///
26/// # Fallback
27///
28/// When `getrandom` fails (e.g. early WASM boot without the `js` feature),
29/// falls back to time + stack address + monotonic counter. This is NOT
30/// cryptographically secure but is sufficient to prevent request ID reuse
31/// across sequential restarts. On all supported platforms (native + WASM
32/// with `js` feature), `getrandom` should always succeed in practice,
33/// though the fallback path exists as a safety net.
34pub fn generate_session_nonce() -> String {
35    use std::sync::atomic::{AtomicU64, Ordering};
36    static FALLBACK_CTR: AtomicU64 = AtomicU64::new(0);
37
38    let mut bytes = [0u8; 16];
39    if getrandom::getrandom(&mut bytes).is_err() {
40        log::warn!("getrandom failed — using fallback nonce (not cryptographically secure)");
41        let t = SystemTime::now()
42            .duration_since(UNIX_EPOCH)
43            .unwrap_or_default()
44            .as_nanos()
45            .to_le_bytes();
46        let ctr = FALLBACK_CTR.fetch_add(1, Ordering::Relaxed).to_le_bytes();
47        bytes[..8].copy_from_slice(&t[..8]);
48        bytes[8..].copy_from_slice(&ctr);
49    }
50    bytes.iter().map(|b| format!("{b:02x}")).collect()
51}
52
53pub trait MeshRuntime: Send + Sync + 'static {
54    fn publish(&self, topic: &str, data_base64: &str) -> bool;
55    fn subscribe(&self, topic: &str) -> bool;
56    fn put_object(&self, path: &str, data_base64: &str, policy: MeshObjectPolicy) -> bool;
57    fn get_object_result(&self, path: &str) -> MeshObjectResult;
58    fn get_object(&self, path: &str) -> Option<String> {
59        self.get_object_result(path).compat_data()
60    }
61    fn request(&self, path: &str, data_base64: &str) -> String;
62    fn respond(&self, request_id: &str, data_base64: &str) -> bool;
63    fn status(&self) -> MeshStatusResult;
64    fn supports_private(&self) -> bool {
65        false
66    }
67    fn put_private_object(
68        &self,
69        _data_base64: &str,
70        _policy: MeshObjectPolicy,
71    ) -> MeshPrivateObjectRef {
72        MeshPrivateObjectRef {
73            handle: String::new(),
74            capability: String::new(),
75            expires_at_ms: None,
76        }
77    }
78    fn get_private_object_result(&self, _handle: &str, _capability: &str) -> MeshObjectResult {
79        MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
80    }
81    fn request_private(&self, _handle: &str, _capability: &str, _data_base64: &str) -> String {
82        String::new()
83    }
84    fn publish_private_control(&self, _capability: &str, _envelope: MeshControlEnvelope) -> bool {
85        false
86    }
87    fn subscribe_private_control(&self, _capability: &str) -> bool {
88        false
89    }
90    fn publish_private_receipt(&self, _capability: &str, _data_base64: &str) -> bool {
91        false
92    }
93    fn subscribe_private_receipt(&self, _capability: &str) -> bool {
94        false
95    }
96    fn drain_events(&self) -> Vec<HostPushEvent> {
97        Vec::new()
98    }
99}
100
101/// Configuration for `InMemoryMeshRuntime` behavior.
102///
103/// The defaults produce a minimal runtime suitable for unit testing.
104/// Use [`InMemoryMeshConfig::loopback`] for a full-featured loopback
105/// runtime that emits all mesh events (query, reply, topic).
106///
107/// # Event emission model
108///
109/// `request()` **always** emits a `meshReply` event (fix #129), regardless
110/// of `emit_loopback_events`. When `emit_loopback_events` is `true`,
111/// additional events are emitted: `meshQuery` on `request()`, `meshReply`
112/// on `respond()`, and `meshTopic` on `publish()`.
113///
114/// Private operations (`request_private`, `publish_private_control`,
115/// `publish_private_receipt`) always emit their events when subscribed,
116/// regardless of `emit_loopback_events`, because private mesh operations
117/// are inherently interactive and omitting events would break the private
118/// object handshake protocol.
119#[derive(Debug, Clone)]
120#[non_exhaustive]
121pub struct InMemoryMeshConfig {
122    /// When `true`, `request()` additionally emits `meshQuery` events,
123    /// `respond()` emits `meshReply`, and `publish()` emits `meshTopic`
124    /// for active subscriptions.
125    pub emit_loopback_events: bool,
126    /// Author tag attached to emitted events (e.g. `Some("loopback")`).
127    pub author_tag: Option<String>,
128    /// Transport name returned by `status()`.
129    pub transport_name: String,
130}
131
132impl Default for InMemoryMeshConfig {
133    fn default() -> Self {
134        Self {
135            emit_loopback_events: false,
136            author_tag: None,
137            transport_name: "in-memory".into(),
138        }
139    }
140}
141
142impl InMemoryMeshConfig {
143    /// Configuration for a loopback runtime that echoes all operations
144    /// back as push events — suitable for WebView integration testing.
145    pub fn loopback() -> Self {
146        Self {
147            emit_loopback_events: true,
148            author_tag: Some("loopback".into()),
149            transport_name: "loopback".into(),
150        }
151    }
152}
153
154#[derive(Debug)]
155pub struct InMemoryMeshRuntime {
156    config: InMemoryMeshConfig,
157    session_nonce: String,
158    next_request_id: AtomicU64,
159    next_private_id: AtomicU64,
160    state: Mutex<InMemoryMeshState>,
161}
162
163#[derive(Debug, Default)]
164struct InMemoryMeshState {
165    subscriptions: HashSet<String>,
166    objects: HashMap<String, StoredMeshObject>,
167    private_objects: HashMap<String, StoredPrivateMeshObject>,
168    private_control_subscriptions: HashSet<String>,
169    private_receipt_subscriptions: HashSet<String>,
170    pending_queries: HashSet<String>,
171    events: VecDeque<HostPushEvent>,
172}
173
174// ── Wire types for statement-store transport ────────────────────────────
175
176pub const MESH_QUERY_CHANNEL: &str = "mesh/query";
177pub const MESH_REPLY_CHANNEL: &str = "mesh/reply";
178pub const MESH_PRIVATE_QUERY_CHANNEL: &str = "mesh/private/query";
179pub const MESH_PRIVATE_CONTROL_CHANNEL: &str = "mesh/private/control";
180pub const MESH_PRIVATE_RECEIPT_CHANNEL: &str = "mesh/private/receipt";
181
182#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
183#[serde(rename_all = "camelCase")]
184pub struct MeshWireQuery {
185    pub request_id: String,
186    pub path: String,
187    pub data_base64: String,
188}
189
190#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
191#[serde(rename_all = "camelCase")]
192pub struct MeshPrivateWireQuery {
193    pub request_id: String,
194    pub handle: String,
195    pub capability: String,
196    pub data_base64: String,
197}
198
199#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
200#[serde(rename_all = "camelCase")]
201pub struct MeshWireReply {
202    pub request_id: String,
203    pub data_base64: Option<String>,
204    #[serde(skip_serializing_if = "Option::is_none")]
205    pub reason: Option<MeshObjectReadReason>,
206    #[serde(skip_serializing_if = "Option::is_none")]
207    pub expires_at_ms: Option<u64>,
208}
209
210#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
211#[serde(rename_all = "camelCase")]
212pub struct MeshPrivateControlWireMessage {
213    pub capability: String,
214    pub envelope: MeshControlEnvelope,
215}
216
217#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
218#[serde(rename_all = "camelCase")]
219pub struct MeshPrivateReceiptWireMessage {
220    pub capability: String,
221    pub data_base64: String,
222}
223
224#[derive(Debug, Clone)]
225pub struct StoredMeshObject {
226    pub data_base64: String,
227    pub expires_at_ms: Option<u64>,
228    pub suppress_previews: bool,
229}
230
231#[derive(Debug, Clone)]
232pub struct StoredPrivateMeshObject {
233    pub capability: String,
234    pub object: StoredMeshObject,
235}
236
237impl StoredMeshObject {
238    pub fn from_put(data_base64: &str, policy: MeshObjectPolicy) -> Self {
239        Self {
240            data_base64: data_base64.to_string(),
241            expires_at_ms: policy.expires_at_ms,
242            suppress_previews: policy.suppress_previews.unwrap_or(false),
243        }
244    }
245
246    pub fn as_result(&self, now_ms: u64) -> MeshObjectResult {
247        // These runtimes do not synthesize previews, but they persist the
248        // effective policy so compliant hosts can honor suppression without
249        // changing the object-store contract.
250        let _preview_suppressed = self.suppress_previews;
251        if self
252            .expires_at_ms
253            .is_some_and(|expires_at_ms| expires_at_ms <= now_ms)
254        {
255            return MeshObjectResult::unavailable(
256                MeshObjectReadReason::Expired,
257                self.expires_at_ms,
258            );
259        }
260
261        MeshObjectResult::found(self.data_base64.clone(), self.expires_at_ms)
262    }
263}
264
265pub fn private_object_result(
266    entry: Option<&StoredPrivateMeshObject>,
267    capability: &str,
268    now_ms: u64,
269) -> MeshObjectResult {
270    let Some(entry) = entry else {
271        return MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None);
272    };
273    if entry.capability != capability {
274        return MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None);
275    }
276    entry.object.as_result(now_ms)
277}
278
279pub fn current_time_ms() -> u64 {
280    SystemTime::now()
281        .duration_since(UNIX_EPOCH)
282        .unwrap_or_default()
283        .as_millis()
284        .min(u128::from(u64::MAX)) as u64
285}
286
287impl Default for InMemoryMeshRuntime {
288    fn default() -> Self {
289        Self::new()
290    }
291}
292
293impl InMemoryMeshRuntime {
294    pub fn new() -> Self {
295        Self::with_config(InMemoryMeshConfig::default())
296    }
297
298    pub fn with_config(config: InMemoryMeshConfig) -> Self {
299        Self {
300            config,
301            session_nonce: generate_session_nonce(),
302            next_request_id: AtomicU64::new(1),
303            next_private_id: AtomicU64::new(1),
304            state: Mutex::new(InMemoryMeshState::default()),
305        }
306    }
307
308    /// Returns the session nonce used for request ID generation.
309    pub fn session_nonce(&self) -> &str {
310        &self.session_nonce
311    }
312
313    /// Test helper: insert a request ID into pending queries.
314    ///
315    /// Useful for testing `respond()` without going through `request()`,
316    /// which auto-resolves and clears pending state.
317    pub fn insert_pending_query(&self, request_id: &str) {
318        self.state
319            .lock()
320            .unwrap_or_else(|e| e.into_inner())
321            .pending_queries
322            .insert(request_id.to_string());
323    }
324
325    /// Test helper: check whether an object at the given path has
326    /// `suppress_previews` set.
327    pub fn has_suppress_previews(&self, path: &str) -> bool {
328        self.state
329            .lock()
330            .unwrap_or_else(|e| e.into_inner())
331            .objects
332            .get(path)
333            .map(|o| o.suppress_previews)
334            .unwrap_or(false)
335    }
336}
337
338impl MeshRuntime for InMemoryMeshRuntime {
339    fn publish(&self, topic: &str, data_base64: &str) -> bool {
340        if self.config.emit_loopback_events {
341            let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
342            if state.subscriptions.contains(topic) {
343                let payload = MeshTopicPayload {
344                    topic: topic.to_string(),
345                    data_base64: data_base64.to_string(),
346                    author: self.config.author_tag.clone(),
347                };
348                if let Ok(payload_json) = serde_json::to_string(&payload) {
349                    state.events.push_back(HostPushEvent {
350                        event: MESH_TOPIC.to_string(),
351                        payload_json,
352                    });
353                }
354            }
355        }
356        true
357    }
358
359    fn subscribe(&self, topic: &str) -> bool {
360        self.state
361            .lock()
362            .unwrap_or_else(|e| e.into_inner())
363            .subscriptions
364            .insert(topic.to_string());
365        true
366    }
367
368    fn put_object(&self, path: &str, data_base64: &str, policy: MeshObjectPolicy) -> bool {
369        self.state
370            .lock()
371            .unwrap_or_else(|e| e.into_inner())
372            .objects
373            .insert(
374                path.to_string(),
375                StoredMeshObject::from_put(data_base64, policy),
376            );
377        true
378    }
379
380    fn get_object_result(&self, path: &str) -> MeshObjectResult {
381        self.state
382            .lock()
383            .unwrap_or_else(|e| e.into_inner())
384            .objects
385            .get(path)
386            .map(|object| object.as_result(current_time_ms()))
387            .unwrap_or_else(|| MeshObjectResult::unavailable(MeshObjectReadReason::NotFound, None))
388    }
389
390    fn request(&self, path: &str, data_base64: &str) -> String {
391        let request_id = format!(
392            "mesh-req-{}-{}",
393            self.session_nonce,
394            self.next_request_id.fetch_add(1, Ordering::Relaxed)
395        );
396        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
397        state.pending_queries.insert(request_id.clone());
398
399        // Emit meshQuery when loopback events are enabled (fix #128).
400        if self.config.emit_loopback_events {
401            let query_payload = MeshQueryPayload {
402                request_id: request_id.clone(),
403                path: path.to_string(),
404                data_base64: data_base64.to_string(),
405                author: self.config.author_tag.clone(),
406            };
407            if let Ok(payload_json) = serde_json::to_string(&query_payload) {
408                state.events.push_back(HostPushEvent {
409                    event: MESH_QUERY.to_string(),
410                    payload_json,
411                });
412            }
413        }
414
415        // Resolve immediately from the local object store and always emit
416        // meshReply — fixes the cache-hit gap (#129).
417        let result = state
418            .objects
419            .get(path)
420            .map(|object| object.as_result(current_time_ms()))
421            .unwrap_or_else(|| MeshObjectResult::unavailable(MeshObjectReadReason::NotFound, None));
422        state.pending_queries.remove(&request_id);
423        let reply_payload = MeshReplyPayload {
424            request_id: request_id.clone(),
425            data_base64: result.data_base64,
426            reason: result.reason,
427            expires_at_ms: result.expires_at_ms,
428            author: self.config.author_tag.clone(),
429        };
430        if let Ok(payload_json) = serde_json::to_string(&reply_payload) {
431            state.events.push_back(HostPushEvent {
432                event: MESH_REPLY.to_string(),
433                payload_json,
434            });
435        }
436
437        request_id
438    }
439
440    /// Marks a pending query as answered. Returns `true` if the request was
441    /// pending, emitting a `meshReply` event with the response data.
442    ///
443    /// This matches the contract of `request()`, which always emits
444    /// `meshReply` — callers can rely on `drain_events()` to observe
445    /// responses regardless of configuration.
446    fn respond(&self, request_id: &str, data_base64: &str) -> bool {
447        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
448        if !state.pending_queries.remove(request_id) {
449            return false;
450        }
451        let payload = MeshReplyPayload {
452            request_id: request_id.to_string(),
453            data_base64: Some(data_base64.to_string()),
454            reason: None,
455            expires_at_ms: None,
456            author: self.config.author_tag.clone(),
457        };
458        if let Ok(payload_json) = serde_json::to_string(&payload) {
459            state.events.push_back(HostPushEvent {
460                event: MESH_REPLY.to_string(),
461                payload_json,
462            });
463        }
464        true
465    }
466
467    fn status(&self) -> MeshStatusResult {
468        let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
469        MeshStatusResult {
470            health: "healthy".to_string(),
471            transport: self.config.transport_name.clone(),
472            pending_publishes: 0,
473            pending_queries: state.pending_queries.len() as u64,
474            last_error: None,
475        }
476    }
477
478    fn supports_private(&self) -> bool {
479        true
480    }
481
482    fn put_private_object(
483        &self,
484        data_base64: &str,
485        policy: MeshObjectPolicy,
486    ) -> MeshPrivateObjectRef {
487        let id = self.next_private_id.fetch_add(1, Ordering::Relaxed);
488        let handle = format!("mesh-private-handle-{id}");
489        let capability = format!("mesh-private-capability-{id}");
490        let expires_at_ms = policy.expires_at_ms;
491        self.state
492            .lock()
493            .unwrap_or_else(|e| e.into_inner())
494            .private_objects
495            .insert(
496                handle.clone(),
497                StoredPrivateMeshObject {
498                    capability: capability.clone(),
499                    object: StoredMeshObject::from_put(data_base64, policy),
500                },
501            );
502        MeshPrivateObjectRef {
503            handle,
504            capability,
505            expires_at_ms,
506        }
507    }
508
509    fn get_private_object_result(&self, handle: &str, capability: &str) -> MeshObjectResult {
510        let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
511        private_object_result(
512            state.private_objects.get(handle),
513            capability,
514            current_time_ms(),
515        )
516    }
517
518    fn request_private(&self, handle: &str, capability: &str, _data_base64: &str) -> String {
519        let request_id = format!(
520            "mesh-private-req-{}-{}",
521            self.session_nonce,
522            self.next_private_id.fetch_add(1, Ordering::Relaxed)
523        );
524        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
525        state.pending_queries.insert(request_id.clone());
526        let result = private_object_result(
527            state.private_objects.get(handle),
528            capability,
529            current_time_ms(),
530        );
531        state.pending_queries.remove(&request_id);
532        let payload = MeshReplyPayload {
533            request_id: request_id.clone(),
534            data_base64: result.data_base64,
535            reason: result.reason,
536            expires_at_ms: result.expires_at_ms,
537            author: self.config.author_tag.clone(),
538        };
539        if let Ok(payload_json) = serde_json::to_string(&payload) {
540            state.events.push_back(HostPushEvent {
541                event: MESH_REPLY.to_string(),
542                payload_json,
543            });
544        }
545        request_id
546    }
547
548    fn publish_private_control(&self, capability: &str, envelope: MeshControlEnvelope) -> bool {
549        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
550        if state.private_control_subscriptions.contains(capability) {
551            let payload = MeshPrivateControlPayload {
552                capability: capability.to_string(),
553                envelope,
554                author: self.config.author_tag.clone(),
555            };
556            if let Ok(payload_json) = serde_json::to_string(&payload) {
557                state.events.push_back(HostPushEvent {
558                    event: MESH_PRIVATE_CONTROL.to_string(),
559                    payload_json,
560                });
561            }
562        }
563        true
564    }
565
566    fn subscribe_private_control(&self, capability: &str) -> bool {
567        self.state
568            .lock()
569            .unwrap_or_else(|e| e.into_inner())
570            .private_control_subscriptions
571            .insert(capability.to_string());
572        true
573    }
574
575    fn publish_private_receipt(&self, capability: &str, data_base64: &str) -> bool {
576        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
577        if state.private_receipt_subscriptions.contains(capability) {
578            let payload = MeshPrivateReceiptPayload {
579                capability: capability.to_string(),
580                data_base64: data_base64.to_string(),
581                author: self.config.author_tag.clone(),
582            };
583            if let Ok(payload_json) = serde_json::to_string(&payload) {
584                state.events.push_back(HostPushEvent {
585                    event: MESH_PRIVATE_RECEIPT.to_string(),
586                    payload_json,
587                });
588            }
589        }
590        true
591    }
592
593    fn subscribe_private_receipt(&self, capability: &str) -> bool {
594        self.state
595            .lock()
596            .unwrap_or_else(|e| e.into_inner())
597            .private_receipt_subscriptions
598            .insert(capability.to_string());
599        true
600    }
601
602    fn drain_events(&self) -> Vec<HostPushEvent> {
603        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
604        state.events.drain(..).collect()
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611    use crate::executor_contract::MeshControlMode;
612
613    #[test]
614    fn in_memory_runtime_publish_without_subscription_emits_no_events() {
615        let runtime = InMemoryMeshRuntime::new();
616        // Publish without subscribing first
617        assert!(runtime.publish_private_control(
618            "unsubscribed-capability",
619            MeshControlEnvelope {
620                mode: MeshControlMode::Encrypted,
621                data_base64: "AQID".into(),
622            },
623        ));
624        assert!(runtime.publish_private_receipt("unsubscribed-capability", "AQID"));
625        // No events should have been emitted (private events are not filtered
626        // by emit_loopback_events — they are always emitted when subscribed).
627        // Since we did not subscribe, no events should be in the queue.
628        assert!(runtime.drain_events().is_empty());
629    }
630
631    #[test]
632    fn in_memory_runtime_returns_structured_object_results() {
633        let runtime = InMemoryMeshRuntime::new();
634        assert!(runtime.put_object(
635            "mesh/object/1",
636            "AQID",
637            MeshObjectPolicy {
638                expires_at_ms: Some(u64::MAX),
639                suppress_previews: Some(true),
640            },
641        ));
642
643        assert_eq!(
644            runtime.get_object_result("mesh/object/1"),
645            MeshObjectResult::found("AQID".into(), Some(u64::MAX))
646        );
647        assert_eq!(runtime.get_object("mesh/object/1").as_deref(), Some("AQID"));
648        assert!(runtime.has_suppress_previews("mesh/object/1"));
649    }
650
651    #[test]
652    fn in_memory_runtime_denies_expired_object_reads() {
653        let runtime = InMemoryMeshRuntime::new();
654        assert!(runtime.put_object(
655            "mesh/object/expired",
656            "AQID",
657            MeshObjectPolicy {
658                expires_at_ms: Some(1),
659                suppress_previews: None,
660            },
661        ));
662
663        assert_eq!(
664            runtime.get_object_result("mesh/object/expired"),
665            MeshObjectResult::unavailable(MeshObjectReadReason::Expired, Some(1))
666        );
667        assert_eq!(runtime.get_object("mesh/object/expired"), None);
668    }
669
670    #[test]
671    fn in_memory_runtime_reports_missing_object() {
672        let runtime = InMemoryMeshRuntime::new();
673
674        assert_eq!(
675            runtime.get_object_result("mesh/object/missing"),
676            MeshObjectResult::unavailable(MeshObjectReadReason::NotFound, None)
677        );
678        assert_eq!(runtime.get_object("mesh/object/missing"), None);
679    }
680
681    #[test]
682    fn test_respond_with_unknown_request_id_returns_false() {
683        let runtime = InMemoryMeshRuntime::new();
684        assert!(
685            !runtime.respond("mesh-req-nonexistent-99", "AQID"),
686            "respond with unknown request_id must return false"
687        );
688    }
689
690    #[test]
691    fn in_memory_runtime_supports_private_object_reads_and_repairs() {
692        let runtime = InMemoryMeshRuntime::new();
693        let reference = runtime.put_private_object(
694            "AQID",
695            MeshObjectPolicy {
696                expires_at_ms: Some(u64::MAX),
697                suppress_previews: Some(true),
698            },
699        );
700
701        assert_eq!(
702            runtime.get_private_object_result(&reference.handle, &reference.capability),
703            MeshObjectResult::found("AQID".into(), Some(u64::MAX))
704        );
705
706        let request_id = runtime.request_private(&reference.handle, &reference.capability, "");
707        let events = runtime.drain_events();
708        assert_eq!(events.len(), 1);
709        assert_eq!(events[0].event, "meshReply");
710        assert_eq!(
711            events[0].payload_json,
712            format!(
713                r#"{{"requestId":"{request_id}","dataBase64":"AQID","expiresAtMs":18446744073709551615}}"#
714            )
715        );
716    }
717
718    #[test]
719    fn test_two_in_memory_runtimes_generate_non_colliding_request_ids() {
720        let runtime_a = InMemoryMeshRuntime::new();
721        let runtime_b = InMemoryMeshRuntime::new();
722
723        let id_a = runtime_a.request("mesh/object/1", "");
724        let id_b = runtime_b.request("mesh/object/1", "");
725
726        assert_ne!(id_a, id_b, "different runtimes at same counter must differ");
727
728        let nonce_a = id_a
729            .split('-')
730            .nth(2)
731            .expect("nonce segment must be present");
732        let nonce_b = id_b
733            .split('-')
734            .nth(2)
735            .expect("nonce segment must be present");
736        assert_ne!(nonce_a, nonce_b, "session nonces must differ");
737    }
738
739    #[test]
740    fn in_memory_runtime_denies_guessed_private_access() {
741        let runtime = InMemoryMeshRuntime::new();
742        let reference = runtime.put_private_object("AQID", MeshObjectPolicy::default());
743
744        assert_eq!(
745            runtime.get_private_object_result(&reference.handle, "wrong-capability"),
746            MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
747        );
748        assert_eq!(
749            runtime.get_private_object_result("guessed-handle", "guessed-capability"),
750            MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
751        );
752    }
753
754    #[test]
755    fn in_memory_runtime_expires_private_capabilities() {
756        let runtime = InMemoryMeshRuntime::new();
757        let reference = runtime.put_private_object(
758            "AQID",
759            MeshObjectPolicy {
760                expires_at_ms: Some(1),
761                suppress_previews: None,
762            },
763        );
764
765        assert_eq!(
766            runtime.get_private_object_result(&reference.handle, &reference.capability),
767            MeshObjectResult::unavailable(MeshObjectReadReason::Expired, Some(1))
768        );
769
770        let request_id = runtime.request_private(&reference.handle, &reference.capability, "");
771        let events = runtime.drain_events();
772        assert_eq!(
773            events[0].payload_json,
774            format!(
775                r#"{{"requestId":"{request_id}","dataBase64":null,"reason":"expired","expiresAtMs":1}}"#
776            )
777        );
778    }
779
780    // ── Fix #129: cache-hit path must emit meshReply ──────────────────────
781
782    #[test]
783    fn in_memory_runtime_emits_mesh_reply_on_cache_hit() {
784        let runtime = InMemoryMeshRuntime::new();
785        assert!(runtime.put_object(
786            "mesh/object/1",
787            "BAUG",
788            MeshObjectPolicy {
789                expires_at_ms: Some(u64::MAX),
790                suppress_previews: None,
791            },
792        ));
793
794        let request_id = runtime.request("mesh/object/1", "AQID");
795        let events = runtime.drain_events();
796        assert_eq!(events.len(), 1, "cache hit must emit exactly one meshReply");
797        assert_eq!(events[0].event, "meshReply");
798        assert_eq!(
799            events[0].payload_json,
800            format!(
801                r#"{{"requestId":"{request_id}","dataBase64":"BAUG","expiresAtMs":18446744073709551615}}"#
802            )
803        );
804        assert_eq!(
805            runtime.status().pending_queries,
806            0,
807            "pending must be cleared after cache-hit auto-resolve"
808        );
809    }
810
811    #[test]
812    fn in_memory_runtime_emits_mesh_reply_for_not_found() {
813        let runtime = InMemoryMeshRuntime::new();
814        let request_id = runtime.request("mesh/object/missing", "AQID");
815        let events = runtime.drain_events();
816        assert_eq!(events.len(), 1, "must emit meshReply even for not-found");
817        assert_eq!(events[0].event, "meshReply");
818        assert_eq!(
819            events[0].payload_json,
820            format!(r#"{{"requestId":"{request_id}","dataBase64":null,"reason":"not_found"}}"#)
821        );
822    }
823
824    // ── Fix #128: loopback config emits meshQuery + meshTopic ─────────────
825
826    #[test]
827    fn in_memory_runtime_loopback_config_emits_topic_on_subscribed_publish() {
828        let runtime = InMemoryMeshRuntime::with_config(InMemoryMeshConfig::loopback());
829        assert!(runtime.subscribe("room/1"));
830        assert!(runtime.publish("room/1", "AQID"));
831
832        let events = runtime.drain_events();
833        assert_eq!(events.len(), 1);
834        assert_eq!(events[0].event, "meshTopic");
835        assert!(events[0].payload_json.contains("\"author\":\"loopback\""));
836    }
837
838    #[test]
839    fn in_memory_runtime_loopback_config_emits_query_and_reply_on_request() {
840        let runtime = InMemoryMeshRuntime::with_config(InMemoryMeshConfig::loopback());
841        let request_id = runtime.request("mesh/object/missing", "AQID");
842        let events = runtime.drain_events();
843        assert_eq!(
844            events.len(),
845            2,
846            "loopback must emit meshQuery then meshReply"
847        );
848        assert_eq!(events[0].event, "meshQuery");
849        assert_eq!(events[1].event, "meshReply");
850        assert_eq!(
851            events[0].payload_json,
852            format!(
853                r#"{{"requestId":"{request_id}","path":"mesh/object/missing","dataBase64":"AQID","author":"loopback"}}"#
854            )
855        );
856    }
857
858    #[test]
859    fn in_memory_runtime_loopback_config_emits_reply_on_respond() {
860        let runtime = InMemoryMeshRuntime::with_config(InMemoryMeshConfig::loopback());
861        // request() auto-resolves and clears pending, so re-insert manually.
862        let external_id = format!("external-req-{}", runtime.session_nonce());
863        runtime.insert_pending_query(&external_id);
864        let _ = runtime.drain_events(); // clear any prior events
865
866        assert!(runtime.respond(&external_id, "BAUG"));
867        let events = runtime.drain_events();
868        assert_eq!(
869            events.len(),
870            1,
871            "respond in loopback mode must emit meshReply"
872        );
873        assert_eq!(events[0].event, "meshReply");
874    }
875
876    #[test]
877    fn in_memory_runtime_loopback_status_reports_loopback_transport() {
878        let runtime = InMemoryMeshRuntime::with_config(InMemoryMeshConfig::loopback());
879        assert_eq!(runtime.status().transport, "loopback");
880    }
881
882    #[test]
883    fn in_memory_runtime_default_status_reports_in_memory_transport() {
884        let runtime = InMemoryMeshRuntime::new();
885        assert_eq!(runtime.status().transport, "in-memory");
886    }
887
888    // ── QA finding #6: combined request() → respond() returns false ──────
889
890    #[test]
891    fn in_memory_runtime_request_auto_resolves_so_respond_returns_false() {
892        let runtime = InMemoryMeshRuntime::new();
893        assert!(runtime.put_object(
894            "mesh/object/1",
895            "BAUG",
896            MeshObjectPolicy {
897                expires_at_ms: Some(u64::MAX),
898                suppress_previews: None,
899            },
900        ));
901
902        let request_id = runtime.request("mesh/object/1", "AQID");
903        // request() auto-resolved and cleared pending
904        assert!(
905            !runtime.respond(&request_id, "BAUG"),
906            "respond after auto-resolved request must return false"
907        );
908        // Only the meshReply from request() should be in the queue
909        let events = runtime.drain_events();
910        assert_eq!(events.len(), 1, "only the auto-reply from request()");
911        assert_eq!(events[0].event, "meshReply");
912    }
913}