Skip to main content

EventQueueManager

Struct EventQueueManager 

Source
pub struct EventQueueManager { /* private fields */ }
Expand description

Manages event queues for active tasks.

Each task can have at most one active writer. Multiple readers can subscribe to the same writer concurrently (fan-out), enabling SubscribeToTask to work even when another SSE stream is active.

Implementations§

Source§

impl EventQueueManager

Source

pub fn new() -> EventQueueManager

Creates a new, empty event queue manager with default capacity.

§Examples
use a2a_protocol_server::EventQueueManager;

let manager = EventQueueManager::new();
Source

pub fn with_capacity(capacity: usize) -> EventQueueManager

Creates a new event queue manager with the specified channel capacity.

Source

pub const fn with_write_timeout(self, timeout: Duration) -> EventQueueManager

Sets the write timeout for event queue sends.

Retained for API compatibility. Broadcast-based queues do not block on writes, so this value is not actively used for backpressure.

Source

pub const fn with_max_event_size( self, max_event_size: usize, ) -> EventQueueManager

Creates a new event queue manager with the specified maximum event size.

Events exceeding this size (in serialized bytes) will be rejected with an error to prevent OOM conditions.

Source

pub fn with_metrics(self, metrics: Arc<dyn Metrics>) -> EventQueueManager

Sets the metrics hook for reporting queue depth changes.

Source

pub const fn with_max_concurrent_queues(self, max: usize) -> EventQueueManager

Sets the maximum number of concurrent event queues.

When the limit is reached, new queue creation will return an error reader (None) to signal capacity exhaustion.

Source

pub async fn get_or_create( &self, task_id: &TaskId, ) -> (Arc<InMemoryQueueWriter>, Option<InMemoryQueueReader>)

Returns the writer for the given task, creating a new queue if none exists.

If a queue already exists, the returned reader is None (callers should use subscribe() to get additional readers for existing queues). If a new queue is created, both the writer and the first reader are returned.

If max_concurrent_queues is set and the limit is reached, returns the writer with None reader (same as existing queue case).

Source

pub async fn get_or_create_with_persistence( &self, task_id: &TaskId, ) -> (Arc<InMemoryQueueWriter>, Option<InMemoryQueueReader>, Option<Receiver<Result<StreamResponse, A2aError>>>)

Like get_or_create, but also creates a dedicated persistence channel for the background event processor.

Returns (writer, Option<sse_reader>, Option<persistence_rx>). The persistence receiver is only returned when a new queue is created (not for existing queues). The persistence channel is independent of the broadcast channel and is not affected by slow SSE consumers.

Source

pub async fn subscribe(&self, task_id: &TaskId) -> Option<InMemoryQueueReader>

Creates a new reader for an existing task’s event queue.

Returns None if no queue exists for the given task. The returned reader will receive all future events written to the queue.

This enables SubscribeToTask (resubscribe) to work even when another SSE stream is already consuming events from the same queue.

Source

pub async fn destroy(&self, task_id: &TaskId)

Removes and drops the event queue for the given task.

Source

pub async fn active_count(&self) -> usize

Returns the number of active event queues.

Source

pub async fn destroy_all(&self)

Removes all event queues, causing all readers to see EOF.

Trait Implementations§

Source§

impl Clone for EventQueueManager

Source§

fn clone(&self) -> EventQueueManager

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for EventQueueManager

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl Default for EventQueueManager

Source§

fn default() -> EventQueueManager

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more