1use std::collections::{HashMap, HashSet};
2use std::sync::{
3 atomic::{AtomicU64, Ordering},
4 Mutex,
5};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use crate::{
9 events::{
10 MeshPrivateControlPayload, MeshPrivateReceiptPayload, MeshReplyPayload,
11 MESH_PRIVATE_CONTROL, MESH_PRIVATE_RECEIPT, MESH_REPLY,
12 },
13 executor_contract::{
14 MeshControlEnvelope, MeshObjectPolicy, MeshObjectReadReason, MeshObjectResult,
15 MeshPrivateObjectRef, MeshStatusResult,
16 },
17 HostPushEvent,
18};
19
20pub fn generate_session_nonce() -> String {
34 use std::sync::atomic::{AtomicU64, Ordering};
35 static FALLBACK_CTR: AtomicU64 = AtomicU64::new(0);
36
37 let mut bytes = [0u8; 16];
38 if getrandom::getrandom(&mut bytes).is_err() {
39 log::warn!("getrandom failed — using fallback nonce (not cryptographically secure)");
40 let t = SystemTime::now()
41 .duration_since(UNIX_EPOCH)
42 .unwrap_or_default()
43 .as_nanos()
44 .to_le_bytes();
45 let ctr = FALLBACK_CTR.fetch_add(1, Ordering::Relaxed).to_le_bytes();
46 bytes[..8].copy_from_slice(&t[..8]);
47 bytes[8..].copy_from_slice(&ctr);
48 }
49 bytes.iter().map(|b| format!("{b:02x}")).collect()
50}
51
52pub trait MeshRuntime: Send + Sync + 'static {
53 fn publish(&self, topic: &str, data_base64: &str) -> bool;
54 fn subscribe(&self, topic: &str) -> bool;
55 fn put_object(&self, path: &str, data_base64: &str, policy: MeshObjectPolicy) -> bool;
56 fn get_object_result(&self, path: &str) -> MeshObjectResult;
57 fn get_object(&self, path: &str) -> Option<String> {
58 self.get_object_result(path).compat_data()
59 }
60 fn request(&self, path: &str, data_base64: &str) -> String;
61 fn respond(&self, request_id: &str, data_base64: &str) -> bool;
62 fn status(&self) -> MeshStatusResult;
63 fn supports_private(&self) -> bool {
64 false
65 }
66 fn put_private_object(
67 &self,
68 _data_base64: &str,
69 _policy: MeshObjectPolicy,
70 ) -> MeshPrivateObjectRef {
71 MeshPrivateObjectRef {
72 handle: String::new(),
73 capability: String::new(),
74 expires_at_ms: None,
75 }
76 }
77 fn get_private_object_result(&self, _handle: &str, _capability: &str) -> MeshObjectResult {
78 MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
79 }
80 fn request_private(&self, _handle: &str, _capability: &str, _data_base64: &str) -> String {
81 String::new()
82 }
83 fn publish_private_control(&self, _capability: &str, _envelope: MeshControlEnvelope) -> bool {
84 false
85 }
86 fn subscribe_private_control(&self, _capability: &str) -> bool {
87 false
88 }
89 fn publish_private_receipt(&self, _capability: &str, _data_base64: &str) -> bool {
90 false
91 }
92 fn subscribe_private_receipt(&self, _capability: &str) -> bool {
93 false
94 }
95 fn drain_events(&self) -> Vec<HostPushEvent> {
96 Vec::new()
97 }
98}
99
100#[derive(Debug)]
101pub struct InMemoryMeshRuntime {
102 session_nonce: String,
103 next_request_id: AtomicU64,
104 next_private_id: AtomicU64,
105 state: Mutex<InMemoryMeshState>,
106}
107
108#[derive(Debug, Default)]
109struct InMemoryMeshState {
110 subscriptions: HashSet<String>,
111 objects: HashMap<String, StoredMeshObject>,
112 private_objects: HashMap<String, StoredPrivateMeshObject>,
113 private_control_subscriptions: HashSet<String>,
114 private_receipt_subscriptions: HashSet<String>,
115 pending_queries: HashSet<String>,
116 events: Vec<HostPushEvent>,
117}
118
119pub const MESH_QUERY_CHANNEL: &str = "mesh/query";
122pub const MESH_REPLY_CHANNEL: &str = "mesh/reply";
123pub const MESH_PRIVATE_QUERY_CHANNEL: &str = "mesh/private/query";
124pub const MESH_PRIVATE_CONTROL_CHANNEL: &str = "mesh/private/control";
125pub const MESH_PRIVATE_RECEIPT_CHANNEL: &str = "mesh/private/receipt";
126
127#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
128#[serde(rename_all = "camelCase")]
129pub struct MeshWireQuery {
130 pub request_id: String,
131 pub path: String,
132 pub data_base64: String,
133}
134
135#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct MeshPrivateWireQuery {
138 pub request_id: String,
139 pub handle: String,
140 pub capability: String,
141 pub data_base64: String,
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
145#[serde(rename_all = "camelCase")]
146pub struct MeshWireReply {
147 pub request_id: String,
148 pub data_base64: Option<String>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 pub reason: Option<MeshObjectReadReason>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pub expires_at_ms: Option<u64>,
153}
154
155#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
156#[serde(rename_all = "camelCase")]
157pub struct MeshPrivateControlWireMessage {
158 pub capability: String,
159 pub envelope: MeshControlEnvelope,
160}
161
162#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
163#[serde(rename_all = "camelCase")]
164pub struct MeshPrivateReceiptWireMessage {
165 pub capability: String,
166 pub data_base64: String,
167}
168
169#[derive(Debug, Clone)]
170pub struct StoredMeshObject {
171 pub data_base64: String,
172 pub expires_at_ms: Option<u64>,
173 pub suppress_previews: bool,
174}
175
176#[derive(Debug, Clone)]
177pub struct StoredPrivateMeshObject {
178 pub capability: String,
179 pub object: StoredMeshObject,
180}
181
182impl StoredMeshObject {
183 pub fn from_put(data_base64: &str, policy: MeshObjectPolicy) -> Self {
184 Self {
185 data_base64: data_base64.to_string(),
186 expires_at_ms: policy.expires_at_ms,
187 suppress_previews: policy.suppress_previews.unwrap_or(false),
188 }
189 }
190
191 pub fn as_result(&self, now_ms: u64) -> MeshObjectResult {
192 let _preview_suppressed = self.suppress_previews;
196 if self
197 .expires_at_ms
198 .is_some_and(|expires_at_ms| expires_at_ms <= now_ms)
199 {
200 return MeshObjectResult::unavailable(
201 MeshObjectReadReason::Expired,
202 self.expires_at_ms,
203 );
204 }
205
206 MeshObjectResult::found(self.data_base64.clone(), self.expires_at_ms)
207 }
208}
209
210pub fn private_object_result(
211 entry: Option<&StoredPrivateMeshObject>,
212 capability: &str,
213 now_ms: u64,
214) -> MeshObjectResult {
215 let Some(entry) = entry else {
216 return MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None);
217 };
218 if entry.capability != capability {
219 return MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None);
220 }
221 entry.object.as_result(now_ms)
222}
223
224pub fn current_time_ms() -> u64 {
225 SystemTime::now()
226 .duration_since(UNIX_EPOCH)
227 .unwrap_or_default()
228 .as_millis()
229 .min(u128::from(u64::MAX)) as u64
230}
231
232impl Default for InMemoryMeshRuntime {
233 fn default() -> Self {
234 Self::new()
235 }
236}
237
238impl InMemoryMeshRuntime {
239 pub fn new() -> Self {
240 Self {
241 session_nonce: generate_session_nonce(),
242 next_request_id: AtomicU64::new(1),
243 next_private_id: AtomicU64::new(1),
244 state: Mutex::new(InMemoryMeshState::default()),
245 }
246 }
247}
248
249impl MeshRuntime for InMemoryMeshRuntime {
250 fn publish(&self, _topic: &str, _data_base64: &str) -> bool {
251 true
252 }
253
254 fn subscribe(&self, topic: &str) -> bool {
255 self.state
256 .lock()
257 .unwrap_or_else(|e| e.into_inner())
258 .subscriptions
259 .insert(topic.to_string());
260 true
261 }
262
263 fn put_object(&self, path: &str, data_base64: &str, policy: MeshObjectPolicy) -> bool {
264 self.state
265 .lock()
266 .unwrap_or_else(|e| e.into_inner())
267 .objects
268 .insert(
269 path.to_string(),
270 StoredMeshObject::from_put(data_base64, policy),
271 );
272 true
273 }
274
275 fn get_object_result(&self, path: &str) -> MeshObjectResult {
276 self.state
277 .lock()
278 .unwrap_or_else(|e| e.into_inner())
279 .objects
280 .get(path)
281 .map(|object| object.as_result(current_time_ms()))
282 .unwrap_or_else(|| MeshObjectResult::unavailable(MeshObjectReadReason::NotFound, None))
283 }
284
285 fn request(&self, _path: &str, _data_base64: &str) -> String {
286 let request_id = format!(
287 "mesh-req-{}-{}",
288 self.session_nonce,
289 self.next_request_id.fetch_add(1, Ordering::Relaxed)
290 );
291 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
292 state.pending_queries.insert(request_id.clone());
293 if state
294 .objects
295 .get(_path)
296 .map(|object| object.as_result(current_time_ms()).data_base64.is_some())
297 .unwrap_or(false)
298 {
299 state.pending_queries.remove(&request_id);
303 }
304 request_id
305 }
306
307 fn respond(&self, request_id: &str, _data_base64: &str) -> bool {
314 self.state
315 .lock()
316 .unwrap_or_else(|e| e.into_inner())
317 .pending_queries
318 .remove(request_id)
319 }
320
321 fn status(&self) -> MeshStatusResult {
322 let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
323 MeshStatusResult {
324 health: "healthy".to_string(),
325 transport: "custom".to_string(),
326 pending_publishes: 0,
327 pending_queries: state.pending_queries.len() as u64,
328 last_error: None,
329 }
330 }
331
332 fn supports_private(&self) -> bool {
333 true
334 }
335
336 fn put_private_object(
337 &self,
338 data_base64: &str,
339 policy: MeshObjectPolicy,
340 ) -> MeshPrivateObjectRef {
341 let id = self.next_private_id.fetch_add(1, Ordering::Relaxed);
342 let handle = format!("mesh-private-handle-{id}");
343 let capability = format!("mesh-private-capability-{id}");
344 let expires_at_ms = policy.expires_at_ms;
345 self.state
346 .lock()
347 .unwrap_or_else(|e| e.into_inner())
348 .private_objects
349 .insert(
350 handle.clone(),
351 StoredPrivateMeshObject {
352 capability: capability.clone(),
353 object: StoredMeshObject::from_put(data_base64, policy),
354 },
355 );
356 MeshPrivateObjectRef {
357 handle,
358 capability,
359 expires_at_ms,
360 }
361 }
362
363 fn get_private_object_result(&self, handle: &str, capability: &str) -> MeshObjectResult {
364 let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
365 private_object_result(
366 state.private_objects.get(handle),
367 capability,
368 current_time_ms(),
369 )
370 }
371
372 fn request_private(&self, handle: &str, capability: &str, _data_base64: &str) -> String {
373 let request_id = format!(
374 "mesh-private-req-{}",
375 self.next_request_id.fetch_add(1, Ordering::Relaxed)
376 );
377 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
378 state.pending_queries.insert(request_id.clone());
379 let result = private_object_result(
380 state.private_objects.get(handle),
381 capability,
382 current_time_ms(),
383 );
384 state.pending_queries.remove(&request_id);
385 let payload = MeshReplyPayload {
386 request_id: request_id.clone(),
387 data_base64: result.data_base64,
388 reason: result.reason,
389 expires_at_ms: result.expires_at_ms,
390 author: None,
391 };
392 if let Ok(payload_json) = serde_json::to_string(&payload) {
393 state.events.push(HostPushEvent {
394 event: MESH_REPLY.to_string(),
395 payload_json,
396 });
397 }
398 request_id
399 }
400
401 fn publish_private_control(&self, capability: &str, envelope: MeshControlEnvelope) -> bool {
402 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
403 if state.private_control_subscriptions.contains(capability) {
404 let payload = MeshPrivateControlPayload {
405 capability: capability.to_string(),
406 envelope,
407 author: None,
408 };
409 if let Ok(payload_json) = serde_json::to_string(&payload) {
410 state.events.push(HostPushEvent {
411 event: MESH_PRIVATE_CONTROL.to_string(),
412 payload_json,
413 });
414 }
415 }
416 true
417 }
418
419 fn subscribe_private_control(&self, capability: &str) -> bool {
420 self.state
421 .lock()
422 .unwrap_or_else(|e| e.into_inner())
423 .private_control_subscriptions
424 .insert(capability.to_string());
425 true
426 }
427
428 fn publish_private_receipt(&self, capability: &str, data_base64: &str) -> bool {
429 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
430 if state.private_receipt_subscriptions.contains(capability) {
431 let payload = MeshPrivateReceiptPayload {
432 capability: capability.to_string(),
433 data_base64: data_base64.to_string(),
434 author: None,
435 };
436 if let Ok(payload_json) = serde_json::to_string(&payload) {
437 state.events.push(HostPushEvent {
438 event: MESH_PRIVATE_RECEIPT.to_string(),
439 payload_json,
440 });
441 }
442 }
443 true
444 }
445
446 fn subscribe_private_receipt(&self, capability: &str) -> bool {
447 self.state
448 .lock()
449 .unwrap_or_else(|e| e.into_inner())
450 .private_receipt_subscriptions
451 .insert(capability.to_string());
452 true
453 }
454
455 fn drain_events(&self) -> Vec<HostPushEvent> {
456 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
457 state.events.drain(..).collect()
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use crate::executor_contract::MeshControlMode;
465
466 #[test]
467 fn in_memory_runtime_publish_without_subscription_emits_no_events() {
468 let runtime = InMemoryMeshRuntime::new();
469 assert!(runtime.publish_private_control(
471 "unsubscribed-capability",
472 MeshControlEnvelope {
473 mode: MeshControlMode::Encrypted,
474 data_base64: "AQID".into(),
475 },
476 ));
477 assert!(runtime.publish_private_receipt("unsubscribed-capability", "AQID"));
478 assert!(runtime.drain_events().is_empty());
480 }
481
482 #[test]
483 fn in_memory_runtime_returns_structured_object_results() {
484 let runtime = InMemoryMeshRuntime::new();
485 assert!(runtime.put_object(
486 "mesh/object/1",
487 "AQID",
488 MeshObjectPolicy {
489 expires_at_ms: Some(u64::MAX),
490 suppress_previews: Some(true),
491 },
492 ));
493
494 assert_eq!(
495 runtime.get_object_result("mesh/object/1"),
496 MeshObjectResult::found("AQID".into(), Some(u64::MAX))
497 );
498 assert_eq!(runtime.get_object("mesh/object/1").as_deref(), Some("AQID"));
499 assert!(runtime
500 .state
501 .lock()
502 .unwrap_or_else(|e| e.into_inner())
503 .objects
504 .get("mesh/object/1")
505 .map(|object| object.suppress_previews)
506 .unwrap_or(false));
507 }
508
509 #[test]
510 fn in_memory_runtime_denies_expired_object_reads() {
511 let runtime = InMemoryMeshRuntime::new();
512 assert!(runtime.put_object(
513 "mesh/object/expired",
514 "AQID",
515 MeshObjectPolicy {
516 expires_at_ms: Some(1),
517 suppress_previews: None,
518 },
519 ));
520
521 assert_eq!(
522 runtime.get_object_result("mesh/object/expired"),
523 MeshObjectResult::unavailable(MeshObjectReadReason::Expired, Some(1))
524 );
525 assert_eq!(runtime.get_object("mesh/object/expired"), None);
526 }
527
528 #[test]
529 fn in_memory_runtime_reports_missing_object() {
530 let runtime = InMemoryMeshRuntime::new();
531
532 assert_eq!(
533 runtime.get_object_result("mesh/object/missing"),
534 MeshObjectResult::unavailable(MeshObjectReadReason::NotFound, None)
535 );
536 assert_eq!(runtime.get_object("mesh/object/missing"), None);
537 }
538
539 #[test]
540 fn test_respond_with_unknown_request_id_returns_false() {
541 let runtime = InMemoryMeshRuntime::new();
542 assert!(
543 !runtime.respond("mesh-req-nonexistent-99", "AQID"),
544 "respond with unknown request_id must return false"
545 );
546 }
547
548 #[test]
549 fn in_memory_runtime_supports_private_object_reads_and_repairs() {
550 let runtime = InMemoryMeshRuntime::new();
551 let reference = runtime.put_private_object(
552 "AQID",
553 MeshObjectPolicy {
554 expires_at_ms: Some(u64::MAX),
555 suppress_previews: Some(true),
556 },
557 );
558
559 assert_eq!(
560 runtime.get_private_object_result(&reference.handle, &reference.capability),
561 MeshObjectResult::found("AQID".into(), Some(u64::MAX))
562 );
563
564 let request_id = runtime.request_private(&reference.handle, &reference.capability, "");
565 let events = runtime.drain_events();
566 assert_eq!(events.len(), 1);
567 assert_eq!(events[0].event, "meshReply");
568 assert_eq!(
569 events[0].payload_json,
570 format!(
571 r#"{{"requestId":"{request_id}","dataBase64":"AQID","expiresAtMs":18446744073709551615}}"#
572 )
573 );
574 }
575
576 #[test]
577 fn test_two_in_memory_runtimes_generate_non_colliding_request_ids() {
578 let runtime_a = InMemoryMeshRuntime::new();
579 let runtime_b = InMemoryMeshRuntime::new();
580
581 let id_a = runtime_a.request("mesh/object/1", "");
582 let id_b = runtime_b.request("mesh/object/1", "");
583
584 assert_ne!(id_a, id_b, "different runtimes at same counter must differ");
585
586 let nonce_a = id_a
587 .split('-')
588 .nth(2)
589 .expect("nonce segment must be present");
590 let nonce_b = id_b
591 .split('-')
592 .nth(2)
593 .expect("nonce segment must be present");
594 assert_ne!(nonce_a, nonce_b, "session nonces must differ");
595 }
596
597 #[test]
598 fn in_memory_runtime_denies_guessed_private_access() {
599 let runtime = InMemoryMeshRuntime::new();
600 let reference = runtime.put_private_object("AQID", MeshObjectPolicy::default());
601
602 assert_eq!(
603 runtime.get_private_object_result(&reference.handle, "wrong-capability"),
604 MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
605 );
606 assert_eq!(
607 runtime.get_private_object_result("guessed-handle", "guessed-capability"),
608 MeshObjectResult::unavailable(MeshObjectReadReason::PolicyDenied, None)
609 );
610 }
611
612 #[test]
613 fn in_memory_runtime_expires_private_capabilities() {
614 let runtime = InMemoryMeshRuntime::new();
615 let reference = runtime.put_private_object(
616 "AQID",
617 MeshObjectPolicy {
618 expires_at_ms: Some(1),
619 suppress_previews: None,
620 },
621 );
622
623 assert_eq!(
624 runtime.get_private_object_result(&reference.handle, &reference.capability),
625 MeshObjectResult::unavailable(MeshObjectReadReason::Expired, Some(1))
626 );
627
628 let request_id = runtime.request_private(&reference.handle, &reference.capability, "");
629 let events = runtime.drain_events();
630 assert_eq!(
631 events[0].payload_json,
632 format!(
633 r#"{{"requestId":"{request_id}","dataBase64":null,"reason":"expired","expiresAtMs":1}}"#
634 )
635 );
636 }
637}