capnweb_client/
client.rs

1// Cap'n Web Client Library
2// Implements the client side of the Cap'n Web protocol with full support for:
3// - Batching multiple operations in a single request
4// - Promise pipelining for dependent operations
5// - Capability passing and lifecycle management
6// - Error handling and validation
7
8use anyhow::{Context, Result};
9use capnweb_core::CapId;
10use reqwest::Client as HttpClient;
11use serde_json::{json, Value};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use tokio::sync::RwLock;
16use tracing::{debug, trace};
17
18/// Client configuration
19#[derive(Debug, Clone)]
20pub struct ClientConfig {
21    /// Base URL for the RPC endpoint
22    pub url: String,
23    /// Maximum number of operations in a single batch
24    pub max_batch_size: usize,
25    /// Request timeout in milliseconds
26    pub timeout_ms: u64,
27}
28
29impl Default for ClientConfig {
30    fn default() -> Self {
31        Self {
32            url: "http://localhost:3000/rpc/batch".to_string(),
33            max_batch_size: 100,
34            timeout_ms: 30000,
35        }
36    }
37}
38
39/// Main client struct for Cap'n Web protocol communication
40pub struct Client {
41    config: ClientConfig,
42    http_client: HttpClient,
43    next_call_id: AtomicU64,
44    capabilities: Arc<RwLock<HashMap<CapId, Value>>>,
45}
46
47impl Client {
48    /// Create a new client with the given configuration
49    pub fn new(config: ClientConfig) -> Result<Self> {
50        let http_client = HttpClient::builder()
51            .timeout(std::time::Duration::from_millis(config.timeout_ms))
52            .build()
53            .context("Failed to build HTTP client")?;
54
55        Ok(Self {
56            config,
57            http_client,
58            next_call_id: AtomicU64::new(1), // Start from 1 for Cap'n Web protocol
59            capabilities: Arc::new(RwLock::new(HashMap::new())),
60        })
61    }
62
63    /// Create a new client with default configuration
64    pub fn new_with_url(url: &str) -> Result<Self> {
65        let config = ClientConfig {
66            url: url.to_string(),
67            ..Default::default()
68        };
69        Self::new(config)
70    }
71
72    /// Create a new batch builder for batching multiple operations
73    pub fn batch(&self) -> BatchBuilder<'_> {
74        BatchBuilder::new(self)
75    }
76
77    /// Perform a single RPC call
78    pub async fn call(&self, cap_id: CapId, method: &str, args: Vec<Value>) -> Result<Value> {
79        // Cap'n Web protocol uses import IDs starting from 1
80        let import_id = self.next_call_id.fetch_add(1, Ordering::SeqCst);
81
82        // Create batch with single operation
83        let messages = vec![
84            json!(["push", ["call", cap_id.as_u64(), [method], args]]),
85            json!(["pull", import_id]),
86        ];
87
88        let results = self.send_batch(messages).await?;
89
90        // Extract result
91        for result in results {
92            if let Some(arr) = result.as_array() {
93                if arr.len() >= 3
94                    && (arr[0] == "result" || arr[0] == "resolve")
95                    && arr[1] == import_id
96                {
97                    return Ok(arr[2].clone());
98                } else if arr.len() >= 3 && arr[0] == "error" && arr[1] == import_id {
99                    let error = arr[2]
100                        .as_object()
101                        .and_then(|o| o.get("message"))
102                        .and_then(|m| m.as_str())
103                        .unwrap_or("Unknown error");
104                    return Err(anyhow::anyhow!("RPC error: {}", error));
105                }
106            }
107        }
108
109        Err(anyhow::anyhow!("No result received for call"))
110    }
111
112    /// Send a batch of messages to the server
113    async fn send_batch(&self, messages: Vec<Value>) -> Result<Vec<Value>> {
114        // Convert messages to newline-delimited JSON
115        let body_parts: Result<Vec<String>> = messages
116            .iter()
117            .map(|m| serde_json::to_string(m).context("Failed to serialize message"))
118            .collect();
119        let body = body_parts?.join("\n");
120
121        debug!(
122            "Sending batch request to {}: {} messages",
123            self.config.url,
124            messages.len()
125        );
126        trace!("Request body:\n{}", body);
127
128        let response = self
129            .http_client
130            .post(&self.config.url)
131            .header("Content-Type", "text/plain")
132            .body(body)
133            .send()
134            .await
135            .context("Failed to send HTTP request")?;
136
137        let status = response.status();
138        let text = response
139            .text()
140            .await
141            .context("Failed to read response body")?;
142
143        if !status.is_success() {
144            return Err(anyhow::anyhow!("HTTP error {}: {}", status, text));
145        }
146
147        trace!("Response body:\n{}", text);
148
149        // Parse each line as a separate message
150        let mut results = Vec::new();
151        for line in text.lines() {
152            let line = line.trim();
153            if !line.is_empty() {
154                let value: Value = serde_json::from_str(line)
155                    .with_context(|| format!("Failed to parse response line: {}", line))?;
156                results.push(value);
157            }
158        }
159
160        debug!("Received {} response messages", results.len());
161        Ok(results)
162    }
163
164    /// Register a capability for use in future calls
165    pub async fn register_capability(&self, id: CapId, cap: Value) {
166        let mut caps = self.capabilities.write().await;
167        caps.insert(id, cap);
168    }
169
170    /// Get a registered capability
171    pub async fn get_capability(&self, id: CapId) -> Option<Value> {
172        let caps = self.capabilities.read().await;
173        caps.get(&id).cloned()
174    }
175
176    /// Dispose of a capability
177    pub async fn dispose_capability(&self, id: CapId) -> Result<()> {
178        let messages = vec![json!(["dispose", id.as_u64()])];
179
180        self.send_batch(messages).await?;
181
182        let mut caps = self.capabilities.write().await;
183        caps.remove(&id);
184
185        Ok(())
186    }
187}
188
189/// Builder for creating batched operations
190pub struct BatchBuilder<'a> {
191    client: &'a Client,
192    operations: Vec<BatchOperation>,
193    next_result_id: u64,
194}
195
196/// A single operation in a batch
197#[derive(Debug, Clone)]
198pub struct BatchOperation {
199    pub id: u64,
200    pub message: Value,
201    pub is_pipeline: bool,
202    pub depends_on: Option<u64>,
203}
204
205/// Handle for a pending operation result
206#[derive(Debug, Clone)]
207pub struct PendingResult {
208    pub id: u64,
209    pub path: Vec<String>,
210}
211
212impl<'a> BatchBuilder<'a> {
213    fn new(client: &'a Client) -> Self {
214        Self {
215            client,
216            operations: Vec::new(),
217            next_result_id: 1, // Start from 1 to match server's import_id assignment
218        }
219    }
220
221    /// Add a call operation to the batch
222    pub fn call(&mut self, cap_id: CapId, method: &str, args: Vec<Value>) -> PendingResult {
223        let result_id = self.next_result_id;
224        self.next_result_id += 1;
225
226        let message = json!(["push", ["call", cap_id.as_u64(), [method], args]]);
227
228        self.operations.push(BatchOperation {
229            id: result_id,
230            message,
231            is_pipeline: false,
232            depends_on: None,
233        });
234
235        PendingResult {
236            id: result_id,
237            path: vec![],
238        }
239    }
240
241    /// Add a pipeline operation that depends on a previous result
242    pub fn pipeline(
243        &mut self,
244        base: &PendingResult,
245        path: Vec<&str>,
246        method: &str,
247        args: Vec<Value>,
248    ) -> PendingResult {
249        let result_id = self.next_result_id;
250        self.next_result_id += 1;
251
252        // Build the pipeline arguments, replacing references to the base result
253        let mut pipeline_args = Vec::new();
254
255        // If path is provided, create a pipeline expression to extract the value
256        if !path.is_empty() {
257            // Create a pipeline expression to extract the value from the base result
258            let path_strings: Vec<Value> = path.iter().map(|s| json!(s)).collect();
259            pipeline_args.push(json!(["pipeline", base.id, path_strings]));
260        }
261
262        // Add any additional arguments
263        pipeline_args.extend(args);
264
265        // Create the pipeline message: ["pipeline", import_id, [method], [args]]
266        let message = json!([
267            "push",
268            ["pipeline", base.id, vec![json!(method)], pipeline_args]
269        ]);
270
271        self.operations.push(BatchOperation {
272            id: result_id,
273            message,
274            is_pipeline: true,
275            depends_on: Some(base.id),
276        });
277
278        PendingResult {
279            id: result_id,
280            path: vec![],
281        }
282    }
283
284    /// Create a reference to a result field for use in arguments
285    pub fn reference(&self, result: &PendingResult, field: &str) -> PendingResult {
286        PendingResult {
287            id: result.id,
288            path: {
289                let mut path = result.path.clone();
290                path.push(field.to_string());
291                path
292            },
293        }
294    }
295
296    /// Execute the batch and return results
297    pub async fn execute(self) -> Result<BatchResults> {
298        if self.operations.is_empty() {
299            return Ok(BatchResults {
300                results: HashMap::new(),
301            });
302        }
303
304        // Build messages: all pushes, then all pulls
305        let mut messages = Vec::new();
306
307        // Add all push operations
308        for op in &self.operations {
309            messages.push(op.message.clone());
310        }
311
312        // Add pull operations for all results
313        for op in &self.operations {
314            messages.push(json!(["pull", op.id]));
315        }
316
317        // Send batch
318        let responses = self.client.send_batch(messages).await?;
319
320        // Parse results
321        let mut results = HashMap::new();
322
323        for response in responses {
324            if let Some(arr) = response.as_array() {
325                if arr.len() >= 2 {
326                    let msg_type = arr[0].as_str().unwrap_or("");
327
328                    match msg_type {
329                        "result" | "resolve" => {
330                            if let Some(id) = arr[1].as_u64() {
331                                let value = arr.get(2).cloned().unwrap_or(json!(null));
332                                results.insert(id, Ok(value));
333                            }
334                        }
335                        "error" => {
336                            if let Some(id) = arr[1].as_u64() {
337                                let error_obj = arr.get(2).cloned().unwrap_or(json!({}));
338                                let error_msg = error_obj
339                                    .get("message")
340                                    .and_then(|m| m.as_str())
341                                    .unwrap_or("Unknown error");
342                                results
343                                    .insert(id, Err(anyhow::anyhow!("RPC error: {}", error_msg)));
344                            }
345                        }
346                        _ => {}
347                    }
348                }
349            }
350        }
351
352        Ok(BatchResults { results })
353    }
354}
355
356/// Results from a batch execution
357pub struct BatchResults {
358    results: HashMap<u64, Result<Value>>,
359}
360
361impl BatchResults {
362    /// Get a result by its pending handle
363    pub fn get(&self, pending: &PendingResult) -> Result<Value> {
364        let value = self
365            .results
366            .get(&pending.id)
367            .ok_or_else(|| anyhow::anyhow!("No result for operation {}", pending.id))?
368            .as_ref()
369            .map_err(|e| anyhow::anyhow!("{}", e))?
370            .clone();
371
372        // Navigate path if specified
373        if pending.path.is_empty() {
374            Ok(value)
375        } else {
376            let mut current = value;
377            for segment in &pending.path {
378                current = current
379                    .get(segment)
380                    .ok_or_else(|| anyhow::anyhow!("Field '{}' not found in result", segment))?
381                    .clone();
382            }
383            Ok(current)
384        }
385    }
386
387    /// Check if a result exists
388    pub fn contains(&self, pending: &PendingResult) -> bool {
389        self.results.contains_key(&pending.id)
390    }
391
392    /// Get all results
393    pub fn all(&self) -> &HashMap<u64, Result<Value>> {
394        &self.results
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401
402    #[test]
403    fn test_client_creation() {
404        let config = ClientConfig::default();
405        let client = Client::new(config);
406        assert!(client.is_ok());
407    }
408
409    #[test]
410    fn test_batch_builder() {
411        let config = ClientConfig::default();
412        let client = Client::new(config).unwrap();
413        let mut batch = client.batch();
414
415        let result = batch.call(CapId::new(1), "test", vec![json!("arg")]);
416        assert_eq!(result.id, 1); // IDs start from 1 per Cap'n Web protocol
417
418        let pipelined = batch.pipeline(&result, vec!["field"], "method", vec![]);
419        assert_eq!(pipelined.id, 2);
420    }
421}