use crate::{RpcTransport, TransportError};
use async_trait::async_trait;
use capnweb_core::Message;
use std::collections::VecDeque;
#[cfg(feature = "http-batch")]
pub struct HttpBatchTransport {
endpoint: String,
outgoing: VecDeque<Message>,
incoming: VecDeque<Message>,
}
#[cfg(feature = "http-batch")]
impl HttpBatchTransport {
pub fn new(endpoint: String) -> Self {
HttpBatchTransport {
endpoint,
outgoing: VecDeque::new(),
incoming: VecDeque::new(),
}
}
pub async fn execute(&mut self) -> Result<(), TransportError> {
if self.outgoing.is_empty() {
return Ok(());
}
let messages: Vec<Message> = self.outgoing.drain(..).collect();
let _body =
serde_json::to_vec(&messages).map_err(|e| TransportError::Codec(e.to_string()))?;
Ok(())
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
pub fn pending_outgoing(&self) -> usize {
self.outgoing.len()
}
pub fn pending_incoming(&self) -> usize {
self.incoming.len()
}
}
#[cfg(feature = "http-batch")]
#[async_trait]
impl RpcTransport for HttpBatchTransport {
async fn send(&mut self, msg: Message) -> Result<(), TransportError> {
self.outgoing.push_back(msg);
Ok(())
}
async fn recv(&mut self) -> Result<Option<Message>, TransportError> {
Ok(self.incoming.pop_front())
}
async fn close(&mut self) -> Result<(), TransportError> {
if !self.outgoing.is_empty() {
self.execute().await?;
}
self.outgoing.clear();
self.incoming.clear();
Ok(())
}
}
#[cfg(all(test, feature = "http-batch"))]
mod tests {
use super::*;
use capnweb_core::{CallId, CapId, Target};
use serde_json::json;
#[tokio::test]
async fn test_batch_transport_queue() {
let mut transport = HttpBatchTransport::new("http://localhost:8080/rpc/batch".to_string());
let msg = Message::call(
CallId::new(1),
Target::cap(CapId::new(42)),
"test".to_string(),
vec![json!("hello")],
);
transport.send(msg.clone()).await.unwrap();
assert_eq!(transport.pending_outgoing(), 1);
assert_eq!(transport.pending_incoming(), 0);
assert!(transport.recv().await.unwrap().is_none());
}
#[test]
fn test_endpoint() {
let transport = HttpBatchTransport::new("http://example.com/rpc".to_string());
assert_eq!(transport.endpoint(), "http://example.com/rpc");
}
#[tokio::test]
async fn test_close_clears_queues() {
let mut transport = HttpBatchTransport::new("http://localhost:8080/rpc/batch".to_string());
let msg = Message::cap_ref(CapId::new(1));
transport.send(msg).await.unwrap();
let _ = transport.close().await;
assert_eq!(transport.pending_outgoing(), 0);
assert_eq!(transport.pending_incoming(), 0);
}
}