capnweb_core/protocol/
tables.rs

1use dashmap::DashMap;
2use serde::{Deserialize, Serialize};
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5// use tokio::sync::oneshot; // TODO: Remove when promise handling is implemented
6
7use super::ids::{ExportId, IdAllocator, ImportId};
8// use super::expression::Expression; // TODO: Remove when expression integration is complete
9use crate::RpcTarget;
10
11/// Type alias for complex promise sender type
12type PromiseSender =
13    Arc<tokio::sync::Mutex<Option<tokio::sync::watch::Sender<Option<Result<Value, Value>>>>>>;
14
15/// Value that can be stored in tables
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
17pub enum Value {
18    Null,
19    Bool(bool),
20    Number(serde_json::Number),
21    String(String),
22    Array(Vec<Value>),
23    Object(std::collections::HashMap<String, Box<Value>>),
24    Date(f64),
25    Error {
26        error_type: String,
27        message: String,
28        stack: Option<String>,
29    },
30    #[serde(skip)]
31    Stub(StubReference),
32    #[serde(skip)]
33    Promise(PromiseReference),
34}
35
36/// Reference to a stub (since `Arc<dyn RpcTarget>` can't implement Clone/Debug directly)
37#[derive(Debug, Clone)]
38pub struct StubReference {
39    pub id: String,
40    #[allow(dead_code)]
41    stub: Arc<dyn RpcTarget>,
42}
43
44impl StubReference {
45    pub fn new(stub: Arc<dyn RpcTarget>) -> Self {
46        Self {
47            id: uuid::Uuid::new_v4().to_string(),
48            stub,
49        }
50    }
51
52    pub fn get(&self) -> Arc<dyn RpcTarget> {
53        self.stub.clone()
54    }
55}
56
57impl PartialEq for StubReference {
58    fn eq(&self, other: &Self) -> bool {
59        // Compare by ID since Arc<dyn RpcTarget> can't be compared
60        self.id == other.id
61    }
62}
63
64/// Reference to a promise
65#[derive(Debug, Clone, PartialEq)]
66pub struct PromiseReference {
67    pub id: String,
68}
69
70/// State of a promise
71#[derive(Debug)]
72pub enum PromiseState {
73    Pending(tokio::sync::watch::Receiver<Option<Result<Value, Value>>>),
74    Resolved(Value),
75    Rejected(Value),
76}
77
78/// Entry in the import table
79#[derive(Debug)]
80pub struct ImportEntry {
81    pub value: ImportValue,
82    pub refcount: AtomicU32,
83}
84
85/// Value stored in an import
86#[derive(Debug, Clone)]
87pub enum ImportValue {
88    Stub(StubReference),
89    Promise(PromiseReference),
90    Value(Value),
91}
92
93/// Entry in the export table
94#[derive(Debug)]
95pub struct ExportEntry {
96    pub value: ExportValue,
97    pub export_count: AtomicU32,
98}
99
100/// Value stored in an export
101#[derive(Debug)]
102pub enum ExportValue {
103    Stub(StubReference),
104    Promise(PromiseSender),
105    Resolved(Value),
106    Rejected(Value),
107}
108
109/// Reference to export value (for returning from get())
110#[derive(Debug)]
111pub enum ExportValueRef {
112    Stub(StubReference),
113    Promise(PromiseSender),
114    Resolved(Value),
115    Rejected(Value),
116}
117
118/// Import table manages imported capabilities and promises
119#[derive(Debug)]
120pub struct ImportTable {
121    allocator: Arc<IdAllocator>,
122    entries: DashMap<ImportId, ImportEntry>,
123}
124
125impl ImportTable {
126    /// Create a new import table with the given allocator
127    pub fn new(allocator: Arc<IdAllocator>) -> Self {
128        Self {
129            allocator,
130            entries: DashMap::new(),
131        }
132    }
133
134    /// Create a new import table with a default allocator
135    pub fn with_default_allocator() -> Self {
136        Self {
137            allocator: Arc::new(IdAllocator::new()),
138            entries: DashMap::new(),
139        }
140    }
141
142    /// Allocate a new local import ID
143    pub fn allocate_local(&self) -> ImportId {
144        self.allocator.allocate_import()
145    }
146
147    /// Insert a new import entry
148    pub fn insert(&self, id: ImportId, value: ImportValue) -> Result<(), TableError> {
149        let entry = ImportEntry {
150            value,
151            refcount: AtomicU32::new(1),
152        };
153
154        if self.entries.insert(id, entry).is_some() {
155            return Err(TableError::DuplicateImport(id));
156        }
157
158        Ok(())
159    }
160
161    /// Get an import entry
162    pub fn get(&self, id: ImportId) -> Option<ImportValue> {
163        self.entries.get(&id).map(|entry| match &entry.value {
164            ImportValue::Stub(stub) => ImportValue::Stub(stub.clone()),
165            ImportValue::Promise(promise) => ImportValue::Promise(promise.clone()),
166            ImportValue::Value(val) => ImportValue::Value(val.clone()),
167        })
168    }
169
170    /// Increment the refcount for an import
171    pub fn add_ref(&self, id: ImportId) -> Result<(), TableError> {
172        self.entries
173            .get(&id)
174            .map(|entry| {
175                entry.refcount.fetch_add(1, Ordering::SeqCst);
176            })
177            .ok_or(TableError::UnknownImport(id))
178    }
179
180    /// Release an import with the given refcount
181    pub fn release(&self, id: ImportId, refcount: u32) -> Result<bool, TableError> {
182        let mut should_remove = false;
183
184        self.entries.alter(&id, |_key, entry| {
185            let current = entry.refcount.load(Ordering::SeqCst);
186            if current >= refcount {
187                let new_count = current - refcount;
188                entry.refcount.store(new_count, Ordering::SeqCst);
189                if new_count == 0 {
190                    should_remove = true;
191                }
192            }
193            entry
194        });
195
196        if should_remove {
197            self.entries.remove(&id);
198            Ok(true)
199        } else {
200            Ok(false)
201        }
202    }
203
204    /// Update a promise import to resolved state
205    pub fn resolve_promise(&self, id: ImportId, value: Value) -> Result<(), TableError> {
206        self.entries.alter(&id, |_key, mut entry| {
207            if let ImportValue::Promise(_promise) = &mut entry.value {
208                // Update to resolved value
209                entry.value = ImportValue::Value(value);
210            }
211            entry
212        });
213        Ok(())
214    }
215}
216
217/// Export table manages exported capabilities and promises
218#[derive(Debug)]
219pub struct ExportTable {
220    allocator: Arc<IdAllocator>,
221    entries: DashMap<ExportId, ExportEntry>,
222}
223
224impl ExportTable {
225    /// Create a new export table with the given allocator
226    pub fn new(allocator: Arc<IdAllocator>) -> Self {
227        Self {
228            allocator,
229            entries: DashMap::new(),
230        }
231    }
232
233    /// Create a new export table with a default allocator
234    pub fn with_default_allocator() -> Self {
235        Self {
236            allocator: Arc::new(IdAllocator::new()),
237            entries: DashMap::new(),
238        }
239    }
240
241    /// Allocate a new local export ID
242    pub fn allocate_local(&self) -> ExportId {
243        self.allocator.allocate_export()
244    }
245
246    /// Insert a new export entry
247    pub fn insert(&self, id: ExportId, value: ExportValue) -> Result<(), TableError> {
248        let entry = ExportEntry {
249            value,
250            export_count: AtomicU32::new(1),
251        };
252
253        if self.entries.insert(id, entry).is_some() {
254            return Err(TableError::DuplicateExport(id));
255        }
256
257        Ok(())
258    }
259
260    /// Export a stub
261    pub fn export_stub(&self, stub: Arc<dyn RpcTarget>) -> ExportId {
262        let id = self.allocate_local();
263        let stub_ref = StubReference::new(stub);
264        let _ = self.insert(id, ExportValue::Stub(stub_ref));
265        id
266    }
267
268    /// Export a new promise
269    pub fn export_promise(
270        &self,
271    ) -> (
272        ExportId,
273        tokio::sync::watch::Receiver<Option<Result<Value, Value>>>,
274    ) {
275        let id = self.allocate_local();
276        let (tx, rx) = tokio::sync::watch::channel(None);
277        let _ = self.insert(
278            id,
279            ExportValue::Promise(Arc::new(tokio::sync::Mutex::new(Some(tx)))),
280        );
281        (id, rx)
282    }
283
284    /// Get an export entry (returns clone for stub/value types)
285    pub fn get(&self, id: ExportId) -> Option<ExportValueRef> {
286        self.entries.get(&id).map(|entry| match &entry.value {
287            ExportValue::Stub(stub) => ExportValueRef::Stub(stub.clone()),
288            ExportValue::Promise(promise) => ExportValueRef::Promise(promise.clone()),
289            ExportValue::Resolved(val) => ExportValueRef::Resolved(val.clone()),
290            ExportValue::Rejected(val) => ExportValueRef::Rejected(val.clone()),
291        })
292    }
293
294    /// Resolve an exported promise
295    pub async fn resolve(&self, id: ExportId, value: Value) -> Result<(), TableError> {
296        if let Some(mut entry) = self.entries.get_mut(&id) {
297            match &entry.value {
298                ExportValue::Promise(promise_sender) => {
299                    // Get the sender and send resolution
300                    if let Some(sender) = promise_sender.lock().await.take() {
301                        let _ = sender.send(Some(Ok(value.clone())));
302                    }
303                    // Update entry to resolved state
304                    entry.value = ExportValue::Resolved(value);
305                }
306                _ => {
307                    // Already resolved or not a promise
308                }
309            }
310        }
311        Ok(())
312    }
313
314    /// Reject an exported promise
315    pub async fn reject(&self, id: ExportId, error: Value) -> Result<(), TableError> {
316        if let Some(mut entry) = self.entries.get_mut(&id) {
317            match &entry.value {
318                ExportValue::Promise(promise_sender) => {
319                    // Get the sender and send rejection
320                    if let Some(sender) = promise_sender.lock().await.take() {
321                        let _ = sender.send(Some(Err(error.clone())));
322                    }
323                    // Update entry to rejected state
324                    entry.value = ExportValue::Rejected(error);
325                }
326                _ => {
327                    // Already resolved or not a promise
328                }
329            }
330        }
331        Ok(())
332    }
333
334    /// Increment the export count
335    pub fn add_export(&self, id: ExportId) -> Result<(), TableError> {
336        self.entries
337            .get(&id)
338            .map(|entry| {
339                entry.export_count.fetch_add(1, Ordering::SeqCst);
340            })
341            .ok_or(TableError::UnknownExport(id))
342    }
343
344    /// Release an export
345    pub fn release(&self, id: ExportId) -> Result<bool, TableError> {
346        let mut should_remove = false;
347
348        self.entries.alter(&id, |_key, entry| {
349            let current = entry.export_count.load(Ordering::SeqCst);
350            if current > 0 {
351                let new_count = current - 1;
352                entry.export_count.store(new_count, Ordering::SeqCst);
353                if new_count == 0 {
354                    should_remove = true;
355                }
356            }
357            entry
358        });
359
360        if should_remove {
361            self.entries.remove(&id);
362            Ok(true)
363        } else {
364            Ok(false)
365        }
366    }
367}
368
369/// Error type for table operations
370#[derive(Debug, thiserror::Error)]
371pub enum TableError {
372    #[error("Duplicate import ID: {0}")]
373    DuplicateImport(ImportId),
374
375    #[error("Duplicate export ID: {0}")]
376    DuplicateExport(ExportId),
377
378    #[error("Unknown import ID: {0}")]
379    UnknownImport(ImportId),
380
381    #[error("Unknown export ID: {0}")]
382    UnknownExport(ExportId),
383
384    #[error("Cannot resolve non-promise export")]
385    NotAPromise,
386
387    #[error("Export already resolved")]
388    AlreadyResolved,
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394
395    #[tokio::test]
396    async fn test_import_table() {
397        let allocator = Arc::new(IdAllocator::new());
398        let table = ImportTable::new(allocator.clone());
399
400        // Test insertion and retrieval
401        let id = table.allocate_local();
402        assert_eq!(id, ImportId(1));
403
404        let stub = Arc::new(crate::MockRpcTarget::new());
405        let stub_ref = StubReference::new(stub);
406        table.insert(id, ImportValue::Stub(stub_ref)).unwrap();
407
408        match table.get(id).unwrap() {
409            ImportValue::Stub(_) => {}
410            _ => panic!("Expected stub"),
411        }
412
413        // Test refcounting
414        table.add_ref(id).unwrap();
415        assert!(!table.release(id, 1).unwrap()); // Should not remove yet
416        assert!(table.release(id, 1).unwrap()); // Should remove now
417        assert!(table.get(id).is_none());
418    }
419
420    #[tokio::test]
421    async fn test_export_table() {
422        let allocator = Arc::new(IdAllocator::new());
423        let table = ExportTable::new(allocator.clone());
424
425        // Test promise export and resolution
426        let (id, mut rx) = table.export_promise();
427        assert_eq!(id, ExportId(-1));
428
429        // Resolve the promise
430        table
431            .resolve(id, Value::String("result".to_string()))
432            .await
433            .unwrap();
434
435        // Check that watchers receive the resolution
436        rx.changed().await.unwrap();
437        match rx.borrow().as_ref().unwrap() {
438            Ok(Value::String(s)) => assert_eq!(s, "result"),
439            _ => panic!("Expected resolved string"),
440        }
441
442        // Check that the export is now resolved
443        match table.get(id).unwrap() {
444            ExportValueRef::Resolved(Value::String(s)) => assert_eq!(s, "result"),
445            _ => panic!("Expected resolved export"),
446        }
447    }
448
449    #[test]
450    fn test_stub_export() {
451        let allocator = Arc::new(IdAllocator::new());
452        let table = ExportTable::new(allocator.clone());
453
454        let stub = Arc::new(crate::MockRpcTarget::new());
455        let id = table.export_stub(stub.clone());
456
457        match table.get(id).unwrap() {
458            ExportValueRef::Stub(_) => {}
459            _ => panic!("Expected stub export"),
460        }
461    }
462}