capnweb_transport/
http_batch.rs1use crate::{RpcTransport, TransportError};
2use async_trait::async_trait;
3use capnweb_core::Message;
4use std::collections::VecDeque;
5
6#[cfg(feature = "http-batch")]
8pub struct HttpBatchTransport {
9 endpoint: String,
11 outgoing: VecDeque<Message>,
13 incoming: VecDeque<Message>,
15}
16
17#[cfg(feature = "http-batch")]
18impl HttpBatchTransport {
19 pub fn new(endpoint: String) -> Self {
20 HttpBatchTransport {
21 endpoint,
22 outgoing: VecDeque::new(),
23 incoming: VecDeque::new(),
24 }
25 }
26
27 pub async fn execute(&mut self) -> Result<(), TransportError> {
30 if self.outgoing.is_empty() {
31 return Ok(());
32 }
33
34 let messages: Vec<Message> = self.outgoing.drain(..).collect();
36
37 let _body =
40 serde_json::to_vec(&messages).map_err(|e| TransportError::Codec(e.to_string()))?;
41
42 Ok(())
48 }
49
50 pub fn endpoint(&self) -> &str {
51 &self.endpoint
52 }
53
54 pub fn pending_outgoing(&self) -> usize {
55 self.outgoing.len()
56 }
57
58 pub fn pending_incoming(&self) -> usize {
59 self.incoming.len()
60 }
61}
62
63#[cfg(feature = "http-batch")]
64#[async_trait]
65impl RpcTransport for HttpBatchTransport {
66 async fn send(&mut self, msg: Message) -> Result<(), TransportError> {
67 self.outgoing.push_back(msg);
68 Ok(())
69 }
70
71 async fn recv(&mut self) -> Result<Option<Message>, TransportError> {
72 Ok(self.incoming.pop_front())
73 }
74
75 async fn close(&mut self) -> Result<(), TransportError> {
76 if !self.outgoing.is_empty() {
78 self.execute().await?;
79 }
80 self.outgoing.clear();
81 self.incoming.clear();
82 Ok(())
83 }
84}
85
86#[cfg(all(test, feature = "http-batch"))]
87mod tests {
88 use super::*;
89 use capnweb_core::{CallId, CapId, Target};
90 use serde_json::json;
91
92 #[tokio::test]
93 async fn test_batch_transport_queue() {
94 let mut transport = HttpBatchTransport::new("http://localhost:8080/rpc/batch".to_string());
95
96 let msg = Message::call(
97 CallId::new(1),
98 Target::cap(CapId::new(42)),
99 "test".to_string(),
100 vec![json!("hello")],
101 );
102
103 transport.send(msg.clone()).await.unwrap();
104 assert_eq!(transport.pending_outgoing(), 1);
105 assert_eq!(transport.pending_incoming(), 0);
106
107 assert!(transport.recv().await.unwrap().is_none());
109 }
110
111 #[test]
112 fn test_endpoint() {
113 let transport = HttpBatchTransport::new("http://example.com/rpc".to_string());
114 assert_eq!(transport.endpoint(), "http://example.com/rpc");
115 }
116
117 #[tokio::test]
118 async fn test_close_clears_queues() {
119 let mut transport = HttpBatchTransport::new("http://localhost:8080/rpc/batch".to_string());
120
121 let msg = Message::cap_ref(CapId::new(1));
122 transport.send(msg).await.unwrap();
123
124 let _ = transport.close().await;
127
128 assert_eq!(transport.pending_outgoing(), 0);
129 assert_eq!(transport.pending_incoming(), 0);
130 }
131}