capnweb_transport/
http_batch.rs

1use crate::{RpcTransport, TransportError};
2use async_trait::async_trait;
3use capnweb_core::Message;
4use std::collections::VecDeque;
5
6/// HTTP Batch transport collects messages and sends them as a single HTTP request
7#[cfg(feature = "http-batch")]
8pub struct HttpBatchTransport {
9    /// URL endpoint for the batch RPC
10    endpoint: String,
11    /// Outgoing message queue
12    outgoing: VecDeque<Message>,
13    /// Incoming message queue
14    incoming: VecDeque<Message>,
15}
16
17#[cfg(feature = "http-batch")]
18impl HttpBatchTransport {
19    pub fn new(endpoint: String) -> Self {
20        HttpBatchTransport {
21            endpoint,
22            outgoing: VecDeque::new(),
23            incoming: VecDeque::new(),
24        }
25    }
26
27    /// Execute the batch - send all queued messages and receive responses
28    /// This is a placeholder implementation that will need an actual HTTP client
29    pub async fn execute(&mut self) -> Result<(), TransportError> {
30        if self.outgoing.is_empty() {
31            return Ok(());
32        }
33
34        // Collect all outgoing messages
35        let messages: Vec<Message> = self.outgoing.drain(..).collect();
36
37        // TODO: Implement actual HTTP request using a client library
38        // For now, we'll just encode the messages to verify the structure
39        let _body =
40            serde_json::to_vec(&messages).map_err(|e| TransportError::Codec(e.to_string()))?;
41
42        // Placeholder: In a real implementation, we would:
43        // 1. Send HTTP POST to self.endpoint
44        // 2. Parse response
45        // 3. Queue incoming messages
46
47        Ok(())
48    }
49
50    pub fn endpoint(&self) -> &str {
51        &self.endpoint
52    }
53
54    pub fn pending_outgoing(&self) -> usize {
55        self.outgoing.len()
56    }
57
58    pub fn pending_incoming(&self) -> usize {
59        self.incoming.len()
60    }
61}
62
63#[cfg(feature = "http-batch")]
64#[async_trait]
65impl RpcTransport for HttpBatchTransport {
66    async fn send(&mut self, msg: Message) -> Result<(), TransportError> {
67        self.outgoing.push_back(msg);
68        Ok(())
69    }
70
71    async fn recv(&mut self) -> Result<Option<Message>, TransportError> {
72        Ok(self.incoming.pop_front())
73    }
74
75    async fn close(&mut self) -> Result<(), TransportError> {
76        // Execute any pending messages before closing
77        if !self.outgoing.is_empty() {
78            self.execute().await?;
79        }
80        self.outgoing.clear();
81        self.incoming.clear();
82        Ok(())
83    }
84}
85
86#[cfg(all(test, feature = "http-batch"))]
87mod tests {
88    use super::*;
89    use capnweb_core::{CallId, CapId, Target};
90    use serde_json::json;
91
92    #[tokio::test]
93    async fn test_batch_transport_queue() {
94        let mut transport = HttpBatchTransport::new("http://localhost:8080/rpc/batch".to_string());
95
96        let msg = Message::call(
97            CallId::new(1),
98            Target::cap(CapId::new(42)),
99            "test".to_string(),
100            vec![json!("hello")],
101        );
102
103        transport.send(msg.clone()).await.unwrap();
104        assert_eq!(transport.pending_outgoing(), 1);
105        assert_eq!(transport.pending_incoming(), 0);
106
107        // Without execution, recv returns None
108        assert!(transport.recv().await.unwrap().is_none());
109    }
110
111    #[test]
112    fn test_endpoint() {
113        let transport = HttpBatchTransport::new("http://example.com/rpc".to_string());
114        assert_eq!(transport.endpoint(), "http://example.com/rpc");
115    }
116
117    #[tokio::test]
118    async fn test_close_clears_queues() {
119        let mut transport = HttpBatchTransport::new("http://localhost:8080/rpc/batch".to_string());
120
121        let msg = Message::cap_ref(CapId::new(1));
122        transport.send(msg).await.unwrap();
123
124        // Note: close will try to execute, which will fail without a server
125        // but it should still clear the queues
126        let _ = transport.close().await;
127
128        assert_eq!(transport.pending_outgoing(), 0);
129        assert_eq!(transport.pending_incoming(), 0);
130    }
131}