1use anyhow::{Context, Result};
9use capnweb_core::CapId;
10use reqwest::Client as HttpClient;
11use serde_json::{json, Value};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use tokio::sync::RwLock;
16use tracing::{debug, trace};
17
18#[derive(Debug, Clone)]
20pub struct ClientConfig {
21 pub url: String,
23 pub max_batch_size: usize,
25 pub timeout_ms: u64,
27}
28
29impl Default for ClientConfig {
30 fn default() -> Self {
31 Self {
32 url: "http://localhost:3000/rpc/batch".to_string(),
33 max_batch_size: 100,
34 timeout_ms: 30000,
35 }
36 }
37}
38
39pub struct Client {
41 config: ClientConfig,
42 http_client: HttpClient,
43 next_call_id: AtomicU64,
44 capabilities: Arc<RwLock<HashMap<CapId, Value>>>,
45}
46
47impl Client {
48 pub fn new(config: ClientConfig) -> Result<Self> {
50 let http_client = HttpClient::builder()
51 .timeout(std::time::Duration::from_millis(config.timeout_ms))
52 .build()
53 .context("Failed to build HTTP client")?;
54
55 Ok(Self {
56 config,
57 http_client,
58 next_call_id: AtomicU64::new(1), capabilities: Arc::new(RwLock::new(HashMap::new())),
60 })
61 }
62
63 pub fn new_with_url(url: &str) -> Result<Self> {
65 let config = ClientConfig {
66 url: url.to_string(),
67 ..Default::default()
68 };
69 Self::new(config)
70 }
71
72 pub fn batch(&self) -> BatchBuilder<'_> {
74 BatchBuilder::new(self)
75 }
76
77 pub async fn call(&self, cap_id: CapId, method: &str, args: Vec<Value>) -> Result<Value> {
79 let import_id = self.next_call_id.fetch_add(1, Ordering::SeqCst);
81
82 let messages = vec![
84 json!(["push", ["call", cap_id.as_u64(), [method], args]]),
85 json!(["pull", import_id]),
86 ];
87
88 let results = self.send_batch(messages).await?;
89
90 for result in results {
92 if let Some(arr) = result.as_array() {
93 if arr.len() >= 3
94 && (arr[0] == "result" || arr[0] == "resolve")
95 && arr[1] == import_id
96 {
97 return Ok(arr[2].clone());
98 } else if arr.len() >= 3 && arr[0] == "error" && arr[1] == import_id {
99 let error = arr[2]
100 .as_object()
101 .and_then(|o| o.get("message"))
102 .and_then(|m| m.as_str())
103 .unwrap_or("Unknown error");
104 return Err(anyhow::anyhow!("RPC error: {}", error));
105 }
106 }
107 }
108
109 Err(anyhow::anyhow!("No result received for call"))
110 }
111
112 async fn send_batch(&self, messages: Vec<Value>) -> Result<Vec<Value>> {
114 let body_parts: Result<Vec<String>> = messages
116 .iter()
117 .map(|m| serde_json::to_string(m).context("Failed to serialize message"))
118 .collect();
119 let body = body_parts?.join("\n");
120
121 debug!(
122 "Sending batch request to {}: {} messages",
123 self.config.url,
124 messages.len()
125 );
126 trace!("Request body:\n{}", body);
127
128 let response = self
129 .http_client
130 .post(&self.config.url)
131 .header("Content-Type", "text/plain")
132 .body(body)
133 .send()
134 .await
135 .context("Failed to send HTTP request")?;
136
137 let status = response.status();
138 let text = response
139 .text()
140 .await
141 .context("Failed to read response body")?;
142
143 if !status.is_success() {
144 return Err(anyhow::anyhow!("HTTP error {}: {}", status, text));
145 }
146
147 trace!("Response body:\n{}", text);
148
149 let mut results = Vec::new();
151 for line in text.lines() {
152 let line = line.trim();
153 if !line.is_empty() {
154 let value: Value = serde_json::from_str(line)
155 .with_context(|| format!("Failed to parse response line: {}", line))?;
156 results.push(value);
157 }
158 }
159
160 debug!("Received {} response messages", results.len());
161 Ok(results)
162 }
163
164 pub async fn register_capability(&self, id: CapId, cap: Value) {
166 let mut caps = self.capabilities.write().await;
167 caps.insert(id, cap);
168 }
169
170 pub async fn get_capability(&self, id: CapId) -> Option<Value> {
172 let caps = self.capabilities.read().await;
173 caps.get(&id).cloned()
174 }
175
176 pub async fn dispose_capability(&self, id: CapId) -> Result<()> {
178 let messages = vec![json!(["dispose", id.as_u64()])];
179
180 self.send_batch(messages).await?;
181
182 let mut caps = self.capabilities.write().await;
183 caps.remove(&id);
184
185 Ok(())
186 }
187}
188
189pub struct BatchBuilder<'a> {
191 client: &'a Client,
192 operations: Vec<BatchOperation>,
193 next_result_id: u64,
194}
195
196#[derive(Debug, Clone)]
198pub struct BatchOperation {
199 pub id: u64,
200 pub message: Value,
201 pub is_pipeline: bool,
202 pub depends_on: Option<u64>,
203}
204
205#[derive(Debug, Clone)]
207pub struct PendingResult {
208 pub id: u64,
209 pub path: Vec<String>,
210}
211
212impl<'a> BatchBuilder<'a> {
213 fn new(client: &'a Client) -> Self {
214 Self {
215 client,
216 operations: Vec::new(),
217 next_result_id: 1, }
219 }
220
221 pub fn call(&mut self, cap_id: CapId, method: &str, args: Vec<Value>) -> PendingResult {
223 let result_id = self.next_result_id;
224 self.next_result_id += 1;
225
226 let message = json!(["push", ["call", cap_id.as_u64(), [method], args]]);
227
228 self.operations.push(BatchOperation {
229 id: result_id,
230 message,
231 is_pipeline: false,
232 depends_on: None,
233 });
234
235 PendingResult {
236 id: result_id,
237 path: vec![],
238 }
239 }
240
241 pub fn pipeline(
243 &mut self,
244 base: &PendingResult,
245 path: Vec<&str>,
246 method: &str,
247 args: Vec<Value>,
248 ) -> PendingResult {
249 let result_id = self.next_result_id;
250 self.next_result_id += 1;
251
252 let mut pipeline_args = Vec::new();
254
255 if !path.is_empty() {
257 let path_strings: Vec<Value> = path.iter().map(|s| json!(s)).collect();
259 pipeline_args.push(json!(["pipeline", base.id, path_strings]));
260 }
261
262 pipeline_args.extend(args);
264
265 let message = json!([
267 "push",
268 ["pipeline", base.id, vec![json!(method)], pipeline_args]
269 ]);
270
271 self.operations.push(BatchOperation {
272 id: result_id,
273 message,
274 is_pipeline: true,
275 depends_on: Some(base.id),
276 });
277
278 PendingResult {
279 id: result_id,
280 path: vec![],
281 }
282 }
283
284 pub fn reference(&self, result: &PendingResult, field: &str) -> PendingResult {
286 PendingResult {
287 id: result.id,
288 path: {
289 let mut path = result.path.clone();
290 path.push(field.to_string());
291 path
292 },
293 }
294 }
295
296 pub async fn execute(self) -> Result<BatchResults> {
298 if self.operations.is_empty() {
299 return Ok(BatchResults {
300 results: HashMap::new(),
301 });
302 }
303
304 let mut messages = Vec::new();
306
307 for op in &self.operations {
309 messages.push(op.message.clone());
310 }
311
312 for op in &self.operations {
314 messages.push(json!(["pull", op.id]));
315 }
316
317 let responses = self.client.send_batch(messages).await?;
319
320 let mut results = HashMap::new();
322
323 for response in responses {
324 if let Some(arr) = response.as_array() {
325 if arr.len() >= 2 {
326 let msg_type = arr[0].as_str().unwrap_or("");
327
328 match msg_type {
329 "result" | "resolve" => {
330 if let Some(id) = arr[1].as_u64() {
331 let value = arr.get(2).cloned().unwrap_or(json!(null));
332 results.insert(id, Ok(value));
333 }
334 }
335 "error" => {
336 if let Some(id) = arr[1].as_u64() {
337 let error_obj = arr.get(2).cloned().unwrap_or(json!({}));
338 let error_msg = error_obj
339 .get("message")
340 .and_then(|m| m.as_str())
341 .unwrap_or("Unknown error");
342 results
343 .insert(id, Err(anyhow::anyhow!("RPC error: {}", error_msg)));
344 }
345 }
346 _ => {}
347 }
348 }
349 }
350 }
351
352 Ok(BatchResults { results })
353 }
354}
355
356pub struct BatchResults {
358 results: HashMap<u64, Result<Value>>,
359}
360
361impl BatchResults {
362 pub fn get(&self, pending: &PendingResult) -> Result<Value> {
364 let value = self
365 .results
366 .get(&pending.id)
367 .ok_or_else(|| anyhow::anyhow!("No result for operation {}", pending.id))?
368 .as_ref()
369 .map_err(|e| anyhow::anyhow!("{}", e))?
370 .clone();
371
372 if pending.path.is_empty() {
374 Ok(value)
375 } else {
376 let mut current = value;
377 for segment in &pending.path {
378 current = current
379 .get(segment)
380 .ok_or_else(|| anyhow::anyhow!("Field '{}' not found in result", segment))?
381 .clone();
382 }
383 Ok(current)
384 }
385 }
386
387 pub fn contains(&self, pending: &PendingResult) -> bool {
389 self.results.contains_key(&pending.id)
390 }
391
392 pub fn all(&self) -> &HashMap<u64, Result<Value>> {
394 &self.results
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401
402 #[test]
403 fn test_client_creation() {
404 let config = ClientConfig::default();
405 let client = Client::new(config);
406 assert!(client.is_ok());
407 }
408
409 #[test]
410 fn test_batch_builder() {
411 let config = ClientConfig::default();
412 let client = Client::new(config).unwrap();
413 let mut batch = client.batch();
414
415 let result = batch.call(CapId::new(1), "test", vec![json!("arg")]);
416 assert_eq!(result.id, 1); let pipelined = batch.pipeline(&result, vec!["field"], "method", vec![]);
419 assert_eq!(pipelined.id, 2);
420 }
421}