1use dashmap::DashMap;
2use serde::{Deserialize, Serialize};
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use super::ids::{ExportId, IdAllocator, ImportId};
8use crate::RpcTarget;
10
11type PromiseSender =
13 Arc<tokio::sync::Mutex<Option<tokio::sync::watch::Sender<Option<Result<Value, Value>>>>>>;
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
17pub enum Value {
18 Null,
19 Bool(bool),
20 Number(serde_json::Number),
21 String(String),
22 Array(Vec<Value>),
23 Object(std::collections::HashMap<String, Box<Value>>),
24 Date(f64),
25 Error {
26 error_type: String,
27 message: String,
28 stack: Option<String>,
29 },
30 #[serde(skip)]
31 Stub(StubReference),
32 #[serde(skip)]
33 Promise(PromiseReference),
34}
35
36#[derive(Debug, Clone)]
38pub struct StubReference {
39 pub id: String,
40 #[allow(dead_code)]
41 stub: Arc<dyn RpcTarget>,
42}
43
44impl StubReference {
45 pub fn new(stub: Arc<dyn RpcTarget>) -> Self {
46 Self {
47 id: uuid::Uuid::new_v4().to_string(),
48 stub,
49 }
50 }
51
52 pub fn get(&self) -> Arc<dyn RpcTarget> {
53 self.stub.clone()
54 }
55}
56
57impl PartialEq for StubReference {
58 fn eq(&self, other: &Self) -> bool {
59 self.id == other.id
61 }
62}
63
64#[derive(Debug, Clone, PartialEq)]
66pub struct PromiseReference {
67 pub id: String,
68}
69
70#[derive(Debug)]
72pub enum PromiseState {
73 Pending(tokio::sync::watch::Receiver<Option<Result<Value, Value>>>),
74 Resolved(Value),
75 Rejected(Value),
76}
77
78#[derive(Debug)]
80pub struct ImportEntry {
81 pub value: ImportValue,
82 pub refcount: AtomicU32,
83}
84
85#[derive(Debug, Clone)]
87pub enum ImportValue {
88 Stub(StubReference),
89 Promise(PromiseReference),
90 Value(Value),
91}
92
93#[derive(Debug)]
95pub struct ExportEntry {
96 pub value: ExportValue,
97 pub export_count: AtomicU32,
98}
99
100#[derive(Debug)]
102pub enum ExportValue {
103 Stub(StubReference),
104 Promise(PromiseSender),
105 Resolved(Value),
106 Rejected(Value),
107}
108
109#[derive(Debug)]
111pub enum ExportValueRef {
112 Stub(StubReference),
113 Promise(PromiseSender),
114 Resolved(Value),
115 Rejected(Value),
116}
117
118#[derive(Debug)]
120pub struct ImportTable {
121 allocator: Arc<IdAllocator>,
122 entries: DashMap<ImportId, ImportEntry>,
123}
124
125impl ImportTable {
126 pub fn new(allocator: Arc<IdAllocator>) -> Self {
128 Self {
129 allocator,
130 entries: DashMap::new(),
131 }
132 }
133
134 pub fn with_default_allocator() -> Self {
136 Self {
137 allocator: Arc::new(IdAllocator::new()),
138 entries: DashMap::new(),
139 }
140 }
141
142 pub fn allocate_local(&self) -> ImportId {
144 self.allocator.allocate_import()
145 }
146
147 pub fn insert(&self, id: ImportId, value: ImportValue) -> Result<(), TableError> {
149 let entry = ImportEntry {
150 value,
151 refcount: AtomicU32::new(1),
152 };
153
154 if self.entries.insert(id, entry).is_some() {
155 return Err(TableError::DuplicateImport(id));
156 }
157
158 Ok(())
159 }
160
161 pub fn get(&self, id: ImportId) -> Option<ImportValue> {
163 self.entries.get(&id).map(|entry| match &entry.value {
164 ImportValue::Stub(stub) => ImportValue::Stub(stub.clone()),
165 ImportValue::Promise(promise) => ImportValue::Promise(promise.clone()),
166 ImportValue::Value(val) => ImportValue::Value(val.clone()),
167 })
168 }
169
170 pub fn add_ref(&self, id: ImportId) -> Result<(), TableError> {
172 self.entries
173 .get(&id)
174 .map(|entry| {
175 entry.refcount.fetch_add(1, Ordering::SeqCst);
176 })
177 .ok_or(TableError::UnknownImport(id))
178 }
179
180 pub fn release(&self, id: ImportId, refcount: u32) -> Result<bool, TableError> {
182 let mut should_remove = false;
183
184 self.entries.alter(&id, |_key, entry| {
185 let current = entry.refcount.load(Ordering::SeqCst);
186 if current >= refcount {
187 let new_count = current - refcount;
188 entry.refcount.store(new_count, Ordering::SeqCst);
189 if new_count == 0 {
190 should_remove = true;
191 }
192 }
193 entry
194 });
195
196 if should_remove {
197 self.entries.remove(&id);
198 Ok(true)
199 } else {
200 Ok(false)
201 }
202 }
203
204 pub fn resolve_promise(&self, id: ImportId, value: Value) -> Result<(), TableError> {
206 self.entries.alter(&id, |_key, mut entry| {
207 if let ImportValue::Promise(_promise) = &mut entry.value {
208 entry.value = ImportValue::Value(value);
210 }
211 entry
212 });
213 Ok(())
214 }
215}
216
217#[derive(Debug)]
219pub struct ExportTable {
220 allocator: Arc<IdAllocator>,
221 entries: DashMap<ExportId, ExportEntry>,
222}
223
224impl ExportTable {
225 pub fn new(allocator: Arc<IdAllocator>) -> Self {
227 Self {
228 allocator,
229 entries: DashMap::new(),
230 }
231 }
232
233 pub fn with_default_allocator() -> Self {
235 Self {
236 allocator: Arc::new(IdAllocator::new()),
237 entries: DashMap::new(),
238 }
239 }
240
241 pub fn allocate_local(&self) -> ExportId {
243 self.allocator.allocate_export()
244 }
245
246 pub fn insert(&self, id: ExportId, value: ExportValue) -> Result<(), TableError> {
248 let entry = ExportEntry {
249 value,
250 export_count: AtomicU32::new(1),
251 };
252
253 if self.entries.insert(id, entry).is_some() {
254 return Err(TableError::DuplicateExport(id));
255 }
256
257 Ok(())
258 }
259
260 pub fn export_stub(&self, stub: Arc<dyn RpcTarget>) -> ExportId {
262 let id = self.allocate_local();
263 let stub_ref = StubReference::new(stub);
264 let _ = self.insert(id, ExportValue::Stub(stub_ref));
265 id
266 }
267
268 pub fn export_promise(
270 &self,
271 ) -> (
272 ExportId,
273 tokio::sync::watch::Receiver<Option<Result<Value, Value>>>,
274 ) {
275 let id = self.allocate_local();
276 let (tx, rx) = tokio::sync::watch::channel(None);
277 let _ = self.insert(
278 id,
279 ExportValue::Promise(Arc::new(tokio::sync::Mutex::new(Some(tx)))),
280 );
281 (id, rx)
282 }
283
284 pub fn get(&self, id: ExportId) -> Option<ExportValueRef> {
286 self.entries.get(&id).map(|entry| match &entry.value {
287 ExportValue::Stub(stub) => ExportValueRef::Stub(stub.clone()),
288 ExportValue::Promise(promise) => ExportValueRef::Promise(promise.clone()),
289 ExportValue::Resolved(val) => ExportValueRef::Resolved(val.clone()),
290 ExportValue::Rejected(val) => ExportValueRef::Rejected(val.clone()),
291 })
292 }
293
294 pub async fn resolve(&self, id: ExportId, value: Value) -> Result<(), TableError> {
296 if let Some(mut entry) = self.entries.get_mut(&id) {
297 match &entry.value {
298 ExportValue::Promise(promise_sender) => {
299 if let Some(sender) = promise_sender.lock().await.take() {
301 let _ = sender.send(Some(Ok(value.clone())));
302 }
303 entry.value = ExportValue::Resolved(value);
305 }
306 _ => {
307 }
309 }
310 }
311 Ok(())
312 }
313
314 pub async fn reject(&self, id: ExportId, error: Value) -> Result<(), TableError> {
316 if let Some(mut entry) = self.entries.get_mut(&id) {
317 match &entry.value {
318 ExportValue::Promise(promise_sender) => {
319 if let Some(sender) = promise_sender.lock().await.take() {
321 let _ = sender.send(Some(Err(error.clone())));
322 }
323 entry.value = ExportValue::Rejected(error);
325 }
326 _ => {
327 }
329 }
330 }
331 Ok(())
332 }
333
334 pub fn add_export(&self, id: ExportId) -> Result<(), TableError> {
336 self.entries
337 .get(&id)
338 .map(|entry| {
339 entry.export_count.fetch_add(1, Ordering::SeqCst);
340 })
341 .ok_or(TableError::UnknownExport(id))
342 }
343
344 pub fn release(&self, id: ExportId) -> Result<bool, TableError> {
346 let mut should_remove = false;
347
348 self.entries.alter(&id, |_key, entry| {
349 let current = entry.export_count.load(Ordering::SeqCst);
350 if current > 0 {
351 let new_count = current - 1;
352 entry.export_count.store(new_count, Ordering::SeqCst);
353 if new_count == 0 {
354 should_remove = true;
355 }
356 }
357 entry
358 });
359
360 if should_remove {
361 self.entries.remove(&id);
362 Ok(true)
363 } else {
364 Ok(false)
365 }
366 }
367}
368
369#[derive(Debug, thiserror::Error)]
371pub enum TableError {
372 #[error("Duplicate import ID: {0}")]
373 DuplicateImport(ImportId),
374
375 #[error("Duplicate export ID: {0}")]
376 DuplicateExport(ExportId),
377
378 #[error("Unknown import ID: {0}")]
379 UnknownImport(ImportId),
380
381 #[error("Unknown export ID: {0}")]
382 UnknownExport(ExportId),
383
384 #[error("Cannot resolve non-promise export")]
385 NotAPromise,
386
387 #[error("Export already resolved")]
388 AlreadyResolved,
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 #[tokio::test]
396 async fn test_import_table() {
397 let allocator = Arc::new(IdAllocator::new());
398 let table = ImportTable::new(allocator.clone());
399
400 let id = table.allocate_local();
402 assert_eq!(id, ImportId(1));
403
404 let stub = Arc::new(crate::MockRpcTarget::new());
405 let stub_ref = StubReference::new(stub);
406 table.insert(id, ImportValue::Stub(stub_ref)).unwrap();
407
408 match table.get(id).unwrap() {
409 ImportValue::Stub(_) => {}
410 _ => panic!("Expected stub"),
411 }
412
413 table.add_ref(id).unwrap();
415 assert!(!table.release(id, 1).unwrap()); assert!(table.release(id, 1).unwrap()); assert!(table.get(id).is_none());
418 }
419
420 #[tokio::test]
421 async fn test_export_table() {
422 let allocator = Arc::new(IdAllocator::new());
423 let table = ExportTable::new(allocator.clone());
424
425 let (id, mut rx) = table.export_promise();
427 assert_eq!(id, ExportId(-1));
428
429 table
431 .resolve(id, Value::String("result".to_string()))
432 .await
433 .unwrap();
434
435 rx.changed().await.unwrap();
437 match rx.borrow().as_ref().unwrap() {
438 Ok(Value::String(s)) => assert_eq!(s, "result"),
439 _ => panic!("Expected resolved string"),
440 }
441
442 match table.get(id).unwrap() {
444 ExportValueRef::Resolved(Value::String(s)) => assert_eq!(s, "result"),
445 _ => panic!("Expected resolved export"),
446 }
447 }
448
449 #[test]
450 fn test_stub_export() {
451 let allocator = Arc::new(IdAllocator::new());
452 let table = ExportTable::new(allocator.clone());
453
454 let stub = Arc::new(crate::MockRpcTarget::new());
455 let id = table.export_stub(stub.clone());
456
457 match table.get(id).unwrap() {
458 ExportValueRef::Stub(_) => {}
459 _ => panic!("Expected stub export"),
460 }
461 }
462}