Skip to main content

Sub

Struct Sub 

Source
pub struct Sub<T> { /* private fields */ }
Expand description

Subscriber wrapper that deserializes messages and optionally acknowledges them.

The subscriber relies on a SubCtxTrait implementation for message retrieval and a SubOptTrait provider for decoding and acknowledgment behavior.

§Example

use std::sync::Arc;
use futures::StreamExt;
use serde::Deserialize;
use object_transfer::{Format, Sub};
use object_transfer::nats::{AckSubOptions, SubFetcher};
use object_transfer::traits::{SubTrait, UnSubTrait};

#[derive(Deserialize, Debug)]
struct Event {
  id: u64,
  name: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
  // Build a JetStream context and configure a durable pull consumer.
  let client = async_nats::connect("demo.nats.io").await?;
  let js = Arc::new(async_nats::jetstream::new(client));

  let options = Arc::new(
    AckSubOptions::new(Format::JSON, Arc::from("events"))
      .subjects(vec!["events.user_created"])
      .durable_name("user-created")
      .auto_ack(false),
  );

  // SubFetcher implements both SubCtxTrait and UnSubTrait.
  let fetcher = Arc::new(SubFetcher::new(js, options.clone()).await?);
  let unsub = fetcher.clone();

  let subscriber: Sub<Event> = Sub::new(fetcher, unsub, options);
  let mut stream = subscriber.subscribe().await?;

  while let Some(Ok((event, ack))) = stream.next().await {
    println!("received {:?}", event);
    // Manually ack since auto_ack(false).
    ack.ack().await?;
  }

  Ok(())
}

Implementations§

Source§

impl<T> Sub<T>
where T: DeserializeOwned + Send + Sync,

Source

pub fn new( ctx: Arc<dyn SubCtxTrait + Send + Sync>, unsub: Arc<dyn UnSubTrait + Send + Sync>, options: Arc<dyn SubOptTrait + Send + Sync>, ) -> Self

Creates a new subscriber using the provided context, optional unsubscribe handler, and subscription options.

§Parameters
  • ctx: Message retrieval context responsible for producing raw items.
  • unsub: Unsubscribe handler to cancel the subscription when requested.
  • options: Subscription behavior such as auto-ack and payload format.

Trait Implementations§

Source§

impl<T> SubTrait for Sub<T>
where T: DeserializeOwned + Send + Sync,

Source§

fn subscribe<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<BoxStream<'_, Result<(Self::Item, Arc<dyn AckTrait + Send + Sync>), SubError>>, SubError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns a stream of decoded messages alongside their acknowledgment handles. When auto-acknowledgment is enabled, messages are acknowledged before being yielded to the consumer.

Source§

type Item = T

Source§

impl<T> UnSubTrait for Sub<T>
where T: DeserializeOwned + Send + Sync,

Source§

fn unsubscribe<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(), UnSubError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Invokes the configured unsubscribe handler.

Auto Trait Implementations§

§

impl<T> Freeze for Sub<T>

§

impl<T> !RefUnwindSafe for Sub<T>

§

impl<T> Send for Sub<T>
where T: Send,

§

impl<T> Sync for Sub<T>
where T: Sync,

§

impl<T> Unpin for Sub<T>
where T: Unpin,

§

impl<T> !UnwindSafe for Sub<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more