Skip to main content

ObservingEventStore

Struct ObservingEventStore 

Source
pub struct ObservingEventStore<S, F> { /* private fields */ }
Expand description

An EventStore decorator that invokes a callback on every appended envelope, then delegates all storage to an inner store.

This is the reusable, blessed way to “stream to stdout” (or to any live observer) with the SDK: the agent loop writes every AgentEventEnvelope through the configured event store, so wrapping a store lets you watch events as they happen — printing TextDeltas, forwarding to a UI channel — without hand-rolling the full five-method EventStore surface or wiring an in-process channel. The callback runs before the inner store records the envelope.

§Example

use agent_sdk_tools::stores::{InMemoryEventStore, ObservingEventStore};
use agent_sdk_foundation::events::AgentEvent;

let _store = ObservingEventStore::new(InMemoryEventStore::new(), |envelope| {
    if let AgentEvent::TextDelta { delta, .. } = &envelope.event {
        print!("{delta}");
    }
});

Implementations§

Source§

impl<S, F> ObservingEventStore<S, F>

Source

pub const fn new(inner: S, observer: F) -> ObservingEventStore<S, F>

Wrap inner, calling observer on every appended envelope before it is persisted.

Source

pub const fn inner(&self) -> &S

Borrow the wrapped inner store (e.g. to read back persisted history).

Trait Implementations§

Source§

impl<S, F> EventStore for ObservingEventStore<S, F>

Source§

fn append<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 ThreadId, turn: usize, envelope: AgentEventEnvelope, ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, ObservingEventStore<S, F>: 'async_trait,

Append an event envelope for the given thread and turn. Read more
Source§

fn finish_turn<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 ThreadId, turn: usize, ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, ObservingEventStore<S, F>: 'async_trait,

Mark the given turn as finished and flush any buffered writes. Read more
Source§

fn get_turn<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 ThreadId, turn: usize, ) -> Pin<Box<dyn Future<Output = Result<Option<StoredTurnEvents>, Error>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, ObservingEventStore<S, F>: 'async_trait,

Retrieve the stored data for a single turn. Read more
Source§

fn get_turns<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 ThreadId, ) -> Pin<Box<dyn Future<Output = Result<Vec<StoredTurnEvents>, Error>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, ObservingEventStore<S, F>: 'async_trait,

Retrieve all stored turns for the given thread in ascending turn order. Read more
Source§

fn get_events<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 ThreadId, ) -> Pin<Box<dyn Future<Output = Result<Vec<AgentEventEnvelope>, Error>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, ObservingEventStore<S, F>: 'async_trait,

Retrieve all event envelopes for the given thread across every stored turn. Read more
Source§

fn event_count<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 ThreadId, ) -> Pin<Box<dyn Future<Output = Result<usize, Error>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, ObservingEventStore<S, F>: 'async_trait,

Count the stored events for thread_id without materializing them. Read more
Source§

fn get_events_since<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 ThreadId, offset: usize, ) -> Pin<Box<dyn Future<Output = Result<Vec<AgentEventEnvelope>, Error>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, ObservingEventStore<S, F>: 'async_trait,

Retrieve event envelopes for thread_id from offset onward, in overall append order, skipping the earlier ones. Read more
Source§

fn clear<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 ThreadId, ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, ObservingEventStore<S, F>: 'async_trait,

Clear all events for the given thread. Read more

Auto Trait Implementations§

§

impl<S, F> Freeze for ObservingEventStore<S, F>
where S: Freeze, F: Freeze,

§

impl<S, F> RefUnwindSafe for ObservingEventStore<S, F>

§

impl<S, F> Send for ObservingEventStore<S, F>
where S: Send, F: Send,

§

impl<S, F> Sync for ObservingEventStore<S, F>
where S: Sync, F: Sync,

§

impl<S, F> Unpin for ObservingEventStore<S, F>
where S: Unpin, F: Unpin,

§

impl<S, F> UnsafeUnpin for ObservingEventStore<S, F>
where S: UnsafeUnpin, F: UnsafeUnpin,

§

impl<S, F> UnwindSafe for ObservingEventStore<S, F>
where S: UnwindSafe, F: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more