capnweb_core/protocol/
session.rs

1// RPC session management for Cap'n Web protocol
2// Handles message processing and session state
3
4use super::expression::Expression;
5use super::ids::{IdAllocator, ImportId};
6use super::message::Message;
7// use super::ids::ExportId; // TODO: Remove when export handling is implemented
8use super::evaluator::ExpressionEvaluator;
9use super::tables::{ExportTable, ImportTable, Value};
10use std::sync::Arc;
11use tokio::sync::Mutex;
12
13/// RPC session state
14pub struct RpcSession {
15    pub allocator: Arc<IdAllocator>,
16    pub imports: Arc<ImportTable>,
17    pub exports: Arc<ExportTable>,
18    pub evaluator: Arc<Mutex<ExpressionEvaluator>>,
19}
20
21impl Default for RpcSession {
22    fn default() -> Self {
23        Self::new()
24    }
25}
26
27impl RpcSession {
28    /// Create a new RPC session
29    pub fn new() -> Self {
30        let allocator = Arc::new(IdAllocator::new());
31        let imports = Arc::new(ImportTable::new(allocator.clone()));
32        let exports = Arc::new(ExportTable::new(allocator.clone()));
33        let evaluator = Arc::new(Mutex::new(ExpressionEvaluator::new(
34            imports.clone(),
35            exports.clone(),
36        )));
37
38        Self {
39            allocator,
40            imports,
41            exports,
42            evaluator,
43        }
44    }
45
46    /// Handle an incoming message
47    pub async fn handle_message(&self, msg: Message) -> Result<(), SessionError> {
48        match msg {
49            Message::Push(expr) => {
50                // Allocate import ID and evaluate expression
51                let _import_id = self.imports.allocate_local();
52
53                // TODO: Evaluate expression and store result
54                let _ = self.evaluator.lock().await.evaluate(expr).await?;
55
56                Ok(())
57            }
58
59            Message::Pull(_import_id) => {
60                // Request resolution of an import
61                // TODO: Send resolve message for the import
62                Ok(())
63            }
64
65            Message::Resolve(export_id, expr) => {
66                // Resolve an export with a value
67                let value = self.evaluator.lock().await.evaluate(expr).await?;
68                self.exports.resolve(export_id, value).await?;
69                Ok(())
70            }
71
72            Message::Reject(export_id, expr) => {
73                // Reject an export with an error
74                let error = self.evaluator.lock().await.evaluate(expr).await?;
75                self.exports.reject(export_id, error).await?;
76                Ok(())
77            }
78
79            Message::Release(import_id, refcount) => {
80                // Release an import
81                self.imports.release(import_id, refcount)?;
82                Ok(())
83            }
84
85            Message::Abort(expr) => {
86                // Terminate the session
87                let _ = self.evaluator.lock().await.evaluate(expr).await?;
88                // TODO: Clean up session
89                Ok(())
90            }
91        }
92    }
93
94    /// Send a push message
95    pub async fn push(&self, _expr: Expression) -> ImportId {
96        // TODO: Send push message over transport
97
98        self.imports.allocate_local()
99    }
100
101    /// Send a pull message
102    pub async fn pull(&self, _import_id: ImportId) -> Result<Value, SessionError> {
103        // TODO: Send pull message and wait for resolution
104        Err(SessionError::NotImplemented)
105    }
106}
107
108#[derive(Debug, thiserror::Error)]
109pub enum SessionError {
110    #[error("Feature not yet implemented")]
111    NotImplemented,
112
113    #[error("Evaluator error: {0}")]
114    EvaluatorError(#[from] super::evaluator::EvaluatorError),
115
116    #[error("Table error: {0}")]
117    TableError(#[from] super::tables::TableError),
118
119    #[error("Transport error")]
120    TransportError,
121}