mod in_memory;
mod manager;
pub use in_memory::{InMemoryQueueReader, InMemoryQueueWriter};
pub use manager::EventQueueManager;
use std::future::Future;
use std::pin::Pin;
#[allow(unused_imports)] use a2a_protocol_types::error::A2aError;
use a2a_protocol_types::error::A2aResult;
use a2a_protocol_types::events::StreamResponse;
use tokio::sync::{broadcast, mpsc};
pub const DEFAULT_QUEUE_CAPACITY: usize = 64;
pub const DEFAULT_MAX_EVENT_SIZE: usize = 16 * 1024 * 1024;
pub const DEFAULT_WRITE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
pub trait EventQueueWriter: Send + Sync + 'static {
fn write<'a>(
&'a self,
event: StreamResponse,
) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
fn close<'a>(&'a self) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
}
pub trait EventQueueReader: Send + 'static {
fn read(
&mut self,
) -> Pin<Box<dyn Future<Output = Option<A2aResult<StreamResponse>>> + Send + '_>>;
}
#[must_use]
pub fn new_in_memory_queue() -> (InMemoryQueueWriter, InMemoryQueueReader) {
new_in_memory_queue_with_options(
DEFAULT_QUEUE_CAPACITY,
DEFAULT_MAX_EVENT_SIZE,
DEFAULT_WRITE_TIMEOUT,
)
}
#[must_use]
pub fn new_in_memory_queue_with_capacity(
capacity: usize,
) -> (InMemoryQueueWriter, InMemoryQueueReader) {
new_in_memory_queue_with_options(capacity, DEFAULT_MAX_EVENT_SIZE, DEFAULT_WRITE_TIMEOUT)
}
#[must_use]
pub fn new_in_memory_queue_with_options(
capacity: usize,
max_event_size: usize,
write_timeout: std::time::Duration,
) -> (InMemoryQueueWriter, InMemoryQueueReader) {
let (tx, rx) = broadcast::channel(capacity);
(
InMemoryQueueWriter::new(tx, max_event_size, write_timeout),
InMemoryQueueReader::new(rx),
)
}
#[must_use]
pub fn new_in_memory_queue_with_persistence(
capacity: usize,
max_event_size: usize,
write_timeout: std::time::Duration,
) -> (
InMemoryQueueWriter,
InMemoryQueueReader,
mpsc::Receiver<A2aResult<StreamResponse>>,
) {
let (tx, rx) = broadcast::channel(capacity);
let (persistence_tx, persistence_rx) = mpsc::channel(capacity.saturating_mul(16).max(1024));
(
InMemoryQueueWriter::new_with_persistence(
tx,
persistence_tx,
max_event_size,
write_timeout,
),
InMemoryQueueReader::new(rx),
persistence_rx,
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_in_memory_queue_returns_pair() {
let (_writer, _reader) = new_in_memory_queue();
}
#[test]
fn new_in_memory_queue_with_capacity_returns_pair() {
let (_writer, _reader) = new_in_memory_queue_with_capacity(128);
}
#[test]
fn new_in_memory_queue_with_options_returns_pair() {
let (_writer, _reader) =
new_in_memory_queue_with_options(32, 1024, std::time::Duration::from_secs(1));
}
}