pub struct Sub<T> { /* private fields */ }Expand description
Subscriber wrapper that deserializes messages and optionally acknowledges them.
The subscriber relies on a SubCtxTrait implementation for message
retrieval and a SubOptTrait provider for decoding and acknowledgment
behavior.
§Example
use std::sync::Arc;
use futures::StreamExt;
use serde::Deserialize;
use object_transfer::{Format, Sub};
use object_transfer::nats::{AckSubOptions, SubFetcher};
use object_transfer::traits::{SubTrait, UnSubTrait};
#[derive(Deserialize, Debug)]
struct Event {
id: u64,
name: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Build a JetStream context and configure a durable pull consumer.
let client = async_nats::connect("demo.nats.io").await?;
let js = Arc::new(async_nats::jetstream::new(client));
let options = Arc::new(
AckSubOptions::new(Format::JSON, Arc::from("events"))
.subjects(vec!["events.user_created"])
.durable_name("user-created")
.auto_ack(false),
);
// SubFetcher implements both SubCtxTrait and UnSubTrait.
let fetcher = Arc::new(SubFetcher::new(js, options.clone()).await?);
let unsub = fetcher.clone();
let subscriber: Sub<Event> = Sub::new(fetcher, unsub, options);
let mut stream = subscriber.subscribe().await?;
while let Some(Ok((event, ack))) = stream.next().await {
println!("received {:?}", event);
// Manually ack since auto_ack(false).
ack.ack().await?;
}
Ok(())
}Implementations§
Source§impl<T> Sub<T>
impl<T> Sub<T>
Sourcepub fn new(
ctx: Arc<dyn SubCtxTrait + Send + Sync>,
unsub: Arc<dyn UnSubTrait + Send + Sync>,
options: Arc<dyn SubOptTrait + Send + Sync>,
) -> Self
pub fn new( ctx: Arc<dyn SubCtxTrait + Send + Sync>, unsub: Arc<dyn UnSubTrait + Send + Sync>, options: Arc<dyn SubOptTrait + Send + Sync>, ) -> Self
Creates a new subscriber using the provided context, optional unsubscribe handler, and subscription options.
§Parameters
ctx: Message retrieval context responsible for producing raw items.unsub: Unsubscribe handler to cancel the subscription when requested.options: Subscription behavior such as auto-ack and payload format.
Trait Implementations§
Source§impl<T> SubTrait for Sub<T>
impl<T> SubTrait for Sub<T>
Source§fn subscribe<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<BoxStream<'_, Result<(Self::Item, Arc<dyn AckTrait + Send + Sync>), SubError>>, SubError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn subscribe<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<BoxStream<'_, Result<(Self::Item, Arc<dyn AckTrait + Send + Sync>), SubError>>, SubError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Returns a stream of decoded messages alongside their acknowledgment handles. When auto-acknowledgment is enabled, messages are acknowledged before being yielded to the consumer.
type Item = T
Source§impl<T> UnSubTrait for Sub<T>
impl<T> UnSubTrait for Sub<T>
Source§fn unsubscribe<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), UnSubError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn unsubscribe<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), UnSubError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Invokes the configured unsubscribe handler.
Auto Trait Implementations§
impl<T> Freeze for Sub<T>
impl<T> !RefUnwindSafe for Sub<T>
impl<T> Send for Sub<T>where
T: Send,
impl<T> Sync for Sub<T>where
T: Sync,
impl<T> Unpin for Sub<T>where
T: Unpin,
impl<T> !UnwindSafe for Sub<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more