Skip to main content

host_extensions/first_party/
mesh_runtime.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::{
3    atomic::{AtomicU64, Ordering},
4    Mutex,
5};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use crate::{
9    events::{
10        MeshPrivateControlPayload, MeshPrivateReceiptPayload, MeshReplyPayload,
11        MESH_PRIVATE_CONTROL, MESH_PRIVATE_RECEIPT, MESH_REPLY,
12    },
13    executor_contract::{
14        MeshControlEnvelope, MeshObjectPolicy, MeshObjectReadReason, MeshObjectResult,
15        MeshPrivateObjectRef, MeshStatusResult,
16    },
17    HostPushEvent,
18};
19
20/// Generate a random 32-character hex nonce for session-unique request IDs.
21///
22/// 16 bytes (128 bits of entropy) keeps cross-run request ID collisions
23/// negligible even over very large numbers of sequential restarts.
24///
25/// # Fallback
26///
27/// When `getrandom` fails (e.g. early WASM boot without the `js` feature),
28/// falls back to time + stack address + monotonic counter. This is NOT
29/// cryptographically secure but is sufficient to prevent request ID reuse
30/// across sequential restarts. On all supported platforms (native + WASM
31/// with `js` feature), `getrandom` should always succeed and the fallback
32/// should be unreachable.
33pub fn generate_session_nonce() -> String {
34    use std::sync::atomic::{AtomicU64, Ordering};
35    static FALLBACK_CTR: AtomicU64 = AtomicU64::new(0);
36
37    let mut bytes = [0u8; 16];
38    if getrandom::getrandom(&mut bytes).is_err() {
39        log::warn!("getrandom failed — using fallback nonce (not cryptographically secure)");
40        let t = SystemTime::now()
41            .duration_since(UNIX_EPOCH)
42            .unwrap_or_default()
43            .as_nanos()
44            .to_le_bytes();
45        let ctr = FALLBACK_CTR.fetch_add(1, Ordering::Relaxed).to_le_bytes();
46        bytes[..8].copy_from_slice(&t[..8]);
47        bytes[8..].copy_from_slice(&ctr);
48    }
49    bytes.iter().map(|b| format!("{b:02x}")).collect()
50}
51
52pub trait MeshRuntime: Send + Sync + 'static {
53    fn publish(&self, topic: &str, data_base64: &str) -> bool;
54    fn subscribe(&self, topic: &str) -> bool;
55    fn put_object(&self, path: &str, data_base64: &str, policy: MeshObjectPolicy) -> bool;
56    fn get_object_result(&self, path: &str) -> MeshObjectResult;
57    fn get_object(&self, path: &str) -> Option<String> {
58        self.get_object_result(path).compat_data()
59    }
60    fn request(&self, path: &str, data_base64: &str) -> String;
61    fn respond(&self, request_id: &str, data_base64: &str) -> bool;
62    fn status(&self) -> MeshStatusResult;
63    fn supports_private(&self) -> bool {
64        false
65    }
66    fn put_private_object(
67        &self,
68        _data_base64: &str,
69        _policy: MeshObjectPolicy,
70    ) -> MeshPrivateObjectRef {
71        MeshPrivateObjectRef {
72            handle: String::new(),
73            capability: String::new(),
74            expires_at_ms: None,
75        }
76    }
77    fn get_private_object_result(&self, _handle: &str, _capability: &str) -> MeshObjectResult {
78        MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
79    }
80    fn request_private(&self, _handle: &str, _capability: &str, _data_base64: &str) -> String {
81        String::new()
82    }
83    fn publish_private_control(&self, _capability: &str, _envelope: MeshControlEnvelope) -> bool {
84        false
85    }
86    fn subscribe_private_control(&self, _capability: &str) -> bool {
87        false
88    }
89    fn publish_private_receipt(&self, _capability: &str, _data_base64: &str) -> bool {
90        false
91    }
92    fn subscribe_private_receipt(&self, _capability: &str) -> bool {
93        false
94    }
95    fn drain_events(&self) -> Vec<HostPushEvent> {
96        Vec::new()
97    }
98}
99
100#[derive(Debug)]
101pub struct InMemoryMeshRuntime {
102    session_nonce: String,
103    next_request_id: AtomicU64,
104    next_private_id: AtomicU64,
105    state: Mutex<InMemoryMeshState>,
106}
107
108#[derive(Debug, Default)]
109struct InMemoryMeshState {
110    subscriptions: HashSet<String>,
111    objects: HashMap<String, StoredMeshObject>,
112    private_objects: HashMap<String, StoredPrivateMeshObject>,
113    private_control_subscriptions: HashSet<String>,
114    private_receipt_subscriptions: HashSet<String>,
115    pending_queries: HashSet<String>,
116    events: Vec<HostPushEvent>,
117}
118
119// ── Wire types for statement-store transport ────────────────────────────
120
121pub const MESH_QUERY_CHANNEL: &str = "mesh/query";
122pub const MESH_REPLY_CHANNEL: &str = "mesh/reply";
123pub const MESH_PRIVATE_QUERY_CHANNEL: &str = "mesh/private/query";
124pub const MESH_PRIVATE_CONTROL_CHANNEL: &str = "mesh/private/control";
125pub const MESH_PRIVATE_RECEIPT_CHANNEL: &str = "mesh/private/receipt";
126
127#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
128#[serde(rename_all = "camelCase")]
129pub struct MeshWireQuery {
130    pub request_id: String,
131    pub path: String,
132    pub data_base64: String,
133}
134
135#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct MeshPrivateWireQuery {
138    pub request_id: String,
139    pub handle: String,
140    pub capability: String,
141    pub data_base64: String,
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
145#[serde(rename_all = "camelCase")]
146pub struct MeshWireReply {
147    pub request_id: String,
148    pub data_base64: Option<String>,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub reason: Option<MeshObjectReadReason>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub expires_at_ms: Option<u64>,
153}
154
155#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
156#[serde(rename_all = "camelCase")]
157pub struct MeshPrivateControlWireMessage {
158    pub capability: String,
159    pub envelope: MeshControlEnvelope,
160}
161
162#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
163#[serde(rename_all = "camelCase")]
164pub struct MeshPrivateReceiptWireMessage {
165    pub capability: String,
166    pub data_base64: String,
167}
168
169#[derive(Debug, Clone)]
170pub struct StoredMeshObject {
171    pub data_base64: String,
172    pub expires_at_ms: Option<u64>,
173    pub suppress_previews: bool,
174}
175
176#[derive(Debug, Clone)]
177pub struct StoredPrivateMeshObject {
178    pub capability: String,
179    pub object: StoredMeshObject,
180}
181
182impl StoredMeshObject {
183    pub fn from_put(data_base64: &str, policy: MeshObjectPolicy) -> Self {
184        Self {
185            data_base64: data_base64.to_string(),
186            expires_at_ms: policy.expires_at_ms,
187            suppress_previews: policy.suppress_previews.unwrap_or(false),
188        }
189    }
190
191    pub fn as_result(&self, now_ms: u64) -> MeshObjectResult {
192        // These runtimes do not synthesize previews, but they persist the
193        // effective policy so compliant hosts can honor suppression without
194        // changing the object-store contract.
195        let _preview_suppressed = self.suppress_previews;
196        if self
197            .expires_at_ms
198            .is_some_and(|expires_at_ms| expires_at_ms <= now_ms)
199        {
200            return MeshObjectResult::unavailable(
201                MeshObjectReadReason::Expired,
202                self.expires_at_ms,
203            );
204        }
205
206        MeshObjectResult::found(self.data_base64.clone(), self.expires_at_ms)
207    }
208}
209
210pub fn private_object_result(
211    entry: Option<&StoredPrivateMeshObject>,
212    capability: &str,
213    now_ms: u64,
214) -> MeshObjectResult {
215    let Some(entry) = entry else {
216        return MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None);
217    };
218    if entry.capability != capability {
219        return MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None);
220    }
221    entry.object.as_result(now_ms)
222}
223
224pub fn current_time_ms() -> u64 {
225    SystemTime::now()
226        .duration_since(UNIX_EPOCH)
227        .unwrap_or_default()
228        .as_millis()
229        .min(u128::from(u64::MAX)) as u64
230}
231
232impl Default for InMemoryMeshRuntime {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237
238impl InMemoryMeshRuntime {
239    pub fn new() -> Self {
240        Self {
241            session_nonce: generate_session_nonce(),
242            next_request_id: AtomicU64::new(1),
243            next_private_id: AtomicU64::new(1),
244            state: Mutex::new(InMemoryMeshState::default()),
245        }
246    }
247}
248
249impl MeshRuntime for InMemoryMeshRuntime {
250    fn publish(&self, _topic: &str, _data_base64: &str) -> bool {
251        true
252    }
253
254    fn subscribe(&self, topic: &str) -> bool {
255        self.state
256            .lock()
257            .unwrap_or_else(|e| e.into_inner())
258            .subscriptions
259            .insert(topic.to_string());
260        true
261    }
262
263    fn put_object(&self, path: &str, data_base64: &str, policy: MeshObjectPolicy) -> bool {
264        self.state
265            .lock()
266            .unwrap_or_else(|e| e.into_inner())
267            .objects
268            .insert(
269                path.to_string(),
270                StoredMeshObject::from_put(data_base64, policy),
271            );
272        true
273    }
274
275    fn get_object_result(&self, path: &str) -> MeshObjectResult {
276        self.state
277            .lock()
278            .unwrap_or_else(|e| e.into_inner())
279            .objects
280            .get(path)
281            .map(|object| object.as_result(current_time_ms()))
282            .unwrap_or_else(|| MeshObjectResult::unavailable(MeshObjectReadReason::NotFound, None))
283    }
284
285    fn request(&self, _path: &str, _data_base64: &str) -> String {
286        let request_id = format!(
287            "mesh-req-{}-{}",
288            self.session_nonce,
289            self.next_request_id.fetch_add(1, Ordering::Relaxed)
290        );
291        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
292        state.pending_queries.insert(request_id.clone());
293        if state
294            .objects
295            .get(_path)
296            .map(|object| object.as_result(current_time_ms()).data_base64.is_some())
297            .unwrap_or(false)
298        {
299            // Known gap: cache-hit path removes pending without emitting
300            // meshReply, unlike the loopback runtime. Deferred — this is
301            // pre-existing InMemoryMeshRuntime behavior, not part of #117.
302            state.pending_queries.remove(&request_id);
303        }
304        request_id
305    }
306
307    /// Marks a pending query as answered. Returns `true` if the request was pending.
308    ///
309    /// NOTE: Unlike `LoopbackMeshRuntime` and `StatementStoreMeshRuntime`,
310    /// this implementation does NOT emit a `meshReply` event and ignores
311    /// `data_base64`. This is a known asymmetry — the InMemoryMeshRuntime
312    /// is for unit testing, not production mesh transport.
313    fn respond(&self, request_id: &str, _data_base64: &str) -> bool {
314        self.state
315            .lock()
316            .unwrap_or_else(|e| e.into_inner())
317            .pending_queries
318            .remove(request_id)
319    }
320
321    fn status(&self) -> MeshStatusResult {
322        let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
323        MeshStatusResult {
324            health: "healthy".to_string(),
325            transport: "custom".to_string(),
326            pending_publishes: 0,
327            pending_queries: state.pending_queries.len() as u64,
328            last_error: None,
329        }
330    }
331
332    fn supports_private(&self) -> bool {
333        true
334    }
335
336    fn put_private_object(
337        &self,
338        data_base64: &str,
339        policy: MeshObjectPolicy,
340    ) -> MeshPrivateObjectRef {
341        let id = self.next_private_id.fetch_add(1, Ordering::Relaxed);
342        let handle = format!("mesh-private-handle-{id}");
343        let capability = format!("mesh-private-capability-{id}");
344        let expires_at_ms = policy.expires_at_ms;
345        self.state
346            .lock()
347            .unwrap_or_else(|e| e.into_inner())
348            .private_objects
349            .insert(
350                handle.clone(),
351                StoredPrivateMeshObject {
352                    capability: capability.clone(),
353                    object: StoredMeshObject::from_put(data_base64, policy),
354                },
355            );
356        MeshPrivateObjectRef {
357            handle,
358            capability,
359            expires_at_ms,
360        }
361    }
362
363    fn get_private_object_result(&self, handle: &str, capability: &str) -> MeshObjectResult {
364        let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
365        private_object_result(
366            state.private_objects.get(handle),
367            capability,
368            current_time_ms(),
369        )
370    }
371
372    fn request_private(&self, handle: &str, capability: &str, _data_base64: &str) -> String {
373        let request_id = format!(
374            "mesh-private-req-{}",
375            self.next_request_id.fetch_add(1, Ordering::Relaxed)
376        );
377        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
378        state.pending_queries.insert(request_id.clone());
379        let result = private_object_result(
380            state.private_objects.get(handle),
381            capability,
382            current_time_ms(),
383        );
384        state.pending_queries.remove(&request_id);
385        let payload = MeshReplyPayload {
386            request_id: request_id.clone(),
387            data_base64: result.data_base64,
388            reason: result.reason,
389            expires_at_ms: result.expires_at_ms,
390            author: None,
391        };
392        if let Ok(payload_json) = serde_json::to_string(&payload) {
393            state.events.push(HostPushEvent {
394                event: MESH_REPLY.to_string(),
395                payload_json,
396            });
397        }
398        request_id
399    }
400
401    fn publish_private_control(&self, capability: &str, envelope: MeshControlEnvelope) -> bool {
402        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
403        if state.private_control_subscriptions.contains(capability) {
404            let payload = MeshPrivateControlPayload {
405                capability: capability.to_string(),
406                envelope,
407                author: None,
408            };
409            if let Ok(payload_json) = serde_json::to_string(&payload) {
410                state.events.push(HostPushEvent {
411                    event: MESH_PRIVATE_CONTROL.to_string(),
412                    payload_json,
413                });
414            }
415        }
416        true
417    }
418
419    fn subscribe_private_control(&self, capability: &str) -> bool {
420        self.state
421            .lock()
422            .unwrap_or_else(|e| e.into_inner())
423            .private_control_subscriptions
424            .insert(capability.to_string());
425        true
426    }
427
428    fn publish_private_receipt(&self, capability: &str, data_base64: &str) -> bool {
429        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
430        if state.private_receipt_subscriptions.contains(capability) {
431            let payload = MeshPrivateReceiptPayload {
432                capability: capability.to_string(),
433                data_base64: data_base64.to_string(),
434                author: None,
435            };
436            if let Ok(payload_json) = serde_json::to_string(&payload) {
437                state.events.push(HostPushEvent {
438                    event: MESH_PRIVATE_RECEIPT.to_string(),
439                    payload_json,
440                });
441            }
442        }
443        true
444    }
445
446    fn subscribe_private_receipt(&self, capability: &str) -> bool {
447        self.state
448            .lock()
449            .unwrap_or_else(|e| e.into_inner())
450            .private_receipt_subscriptions
451            .insert(capability.to_string());
452        true
453    }
454
455    fn drain_events(&self) -> Vec<HostPushEvent> {
456        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
457        state.events.drain(..).collect()
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464    use crate::executor_contract::MeshControlMode;
465
466    #[test]
467    fn in_memory_runtime_publish_without_subscription_emits_no_events() {
468        let runtime = InMemoryMeshRuntime::new();
469        // Publish without subscribing first
470        assert!(runtime.publish_private_control(
471            "unsubscribed-capability",
472            MeshControlEnvelope {
473                mode: MeshControlMode::Encrypted,
474                data_base64: "AQID".into(),
475            },
476        ));
477        assert!(runtime.publish_private_receipt("unsubscribed-capability", "AQID"));
478        // No events should have been emitted
479        assert!(runtime.drain_events().is_empty());
480    }
481
482    #[test]
483    fn in_memory_runtime_returns_structured_object_results() {
484        let runtime = InMemoryMeshRuntime::new();
485        assert!(runtime.put_object(
486            "mesh/object/1",
487            "AQID",
488            MeshObjectPolicy {
489                expires_at_ms: Some(u64::MAX),
490                suppress_previews: Some(true),
491            },
492        ));
493
494        assert_eq!(
495            runtime.get_object_result("mesh/object/1"),
496            MeshObjectResult::found("AQID".into(), Some(u64::MAX))
497        );
498        assert_eq!(runtime.get_object("mesh/object/1").as_deref(), Some("AQID"));
499        assert!(runtime
500            .state
501            .lock()
502            .unwrap_or_else(|e| e.into_inner())
503            .objects
504            .get("mesh/object/1")
505            .map(|object| object.suppress_previews)
506            .unwrap_or(false));
507    }
508
509    #[test]
510    fn in_memory_runtime_denies_expired_object_reads() {
511        let runtime = InMemoryMeshRuntime::new();
512        assert!(runtime.put_object(
513            "mesh/object/expired",
514            "AQID",
515            MeshObjectPolicy {
516                expires_at_ms: Some(1),
517                suppress_previews: None,
518            },
519        ));
520
521        assert_eq!(
522            runtime.get_object_result("mesh/object/expired"),
523            MeshObjectResult::unavailable(MeshObjectReadReason::Expired, Some(1))
524        );
525        assert_eq!(runtime.get_object("mesh/object/expired"), None);
526    }
527
528    #[test]
529    fn in_memory_runtime_reports_missing_object() {
530        let runtime = InMemoryMeshRuntime::new();
531
532        assert_eq!(
533            runtime.get_object_result("mesh/object/missing"),
534            MeshObjectResult::unavailable(MeshObjectReadReason::NotFound, None)
535        );
536        assert_eq!(runtime.get_object("mesh/object/missing"), None);
537    }
538
539    #[test]
540    fn test_respond_with_unknown_request_id_returns_false() {
541        let runtime = InMemoryMeshRuntime::new();
542        assert!(
543            !runtime.respond("mesh-req-nonexistent-99", "AQID"),
544            "respond with unknown request_id must return false"
545        );
546    }
547
548    #[test]
549    fn in_memory_runtime_supports_private_object_reads_and_repairs() {
550        let runtime = InMemoryMeshRuntime::new();
551        let reference = runtime.put_private_object(
552            "AQID",
553            MeshObjectPolicy {
554                expires_at_ms: Some(u64::MAX),
555                suppress_previews: Some(true),
556            },
557        );
558
559        assert_eq!(
560            runtime.get_private_object_result(&reference.handle, &reference.capability),
561            MeshObjectResult::found("AQID".into(), Some(u64::MAX))
562        );
563
564        let request_id = runtime.request_private(&reference.handle, &reference.capability, "");
565        let events = runtime.drain_events();
566        assert_eq!(events.len(), 1);
567        assert_eq!(events[0].event, "meshReply");
568        assert_eq!(
569            events[0].payload_json,
570            format!(
571                r#"{{"requestId":"{request_id}","dataBase64":"AQID","expiresAtMs":18446744073709551615}}"#
572            )
573        );
574    }
575
576    #[test]
577    fn test_two_in_memory_runtimes_generate_non_colliding_request_ids() {
578        let runtime_a = InMemoryMeshRuntime::new();
579        let runtime_b = InMemoryMeshRuntime::new();
580
581        let id_a = runtime_a.request("mesh/object/1", "");
582        let id_b = runtime_b.request("mesh/object/1", "");
583
584        assert_ne!(id_a, id_b, "different runtimes at same counter must differ");
585
586        let nonce_a = id_a
587            .split('-')
588            .nth(2)
589            .expect("nonce segment must be present");
590        let nonce_b = id_b
591            .split('-')
592            .nth(2)
593            .expect("nonce segment must be present");
594        assert_ne!(nonce_a, nonce_b, "session nonces must differ");
595    }
596
597    #[test]
598    fn in_memory_runtime_denies_guessed_private_access() {
599        let runtime = InMemoryMeshRuntime::new();
600        let reference = runtime.put_private_object("AQID", MeshObjectPolicy::default());
601
602        assert_eq!(
603            runtime.get_private_object_result(&reference.handle, "wrong-capability"),
604            MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
605        );
606        assert_eq!(
607            runtime.get_private_object_result("guessed-handle", "guessed-capability"),
608            MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
609        );
610    }
611
612    #[test]
613    fn in_memory_runtime_expires_private_capabilities() {
614        let runtime = InMemoryMeshRuntime::new();
615        let reference = runtime.put_private_object(
616            "AQID",
617            MeshObjectPolicy {
618                expires_at_ms: Some(1),
619                suppress_previews: None,
620            },
621        );
622
623        assert_eq!(
624            runtime.get_private_object_result(&reference.handle, &reference.capability),
625            MeshObjectResult::unavailable(MeshObjectReadReason::Expired, Some(1))
626        );
627
628        let request_id = runtime.request_private(&reference.handle, &reference.capability, "");
629        let events = runtime.drain_events();
630        assert_eq!(
631            events[0].payload_json,
632            format!(
633                r#"{{"requestId":"{request_id}","dataBase64":null,"reason":"expired","expiresAtMs":1}}"#
634            )
635        );
636    }
637}