Struct google_cloud_pubsub::subscription::Subscription
source · [−]pub struct Subscription { /* private fields */ }Expand description
Subscription is a reference to a PubSub subscription.
Implementations
sourceimpl Subscription
impl Subscription
sourcepub fn id(&self) -> String
pub fn id(&self) -> String
id returns the unique identifier of the subscription within its project.
sourcepub fn fully_qualified_name(&self) -> &str
pub fn fully_qualified_name(&self) -> &str
fully_qualified_name returns the globally unique printable name of the subscription.
sourcepub async fn create(
&self,
fqtn: &str,
cfg: SubscriptionConfig,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<(), Status>
pub async fn create(
&self,
fqtn: &str,
cfg: SubscriptionConfig,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<(), Status>
create creates the subscription.
sourcepub async fn delete(
&self,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<(), Status>
pub async fn delete(
&self,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<(), Status>
delete deletes the subscription.
sourcepub async fn exists(
&self,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<bool, Status>
pub async fn exists(
&self,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<bool, Status>
exists reports whether the subscription exists on the server.
sourcepub async fn config(
&self,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<(String, SubscriptionConfig), Status>
pub async fn config(
&self,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<(String, SubscriptionConfig), Status>
config fetches the current configuration for the subscription.
sourcepub async fn update(
&self,
updating: SubscriptionConfigToUpdate,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<(String, SubscriptionConfig), Status>
pub async fn update(
&self,
updating: SubscriptionConfigToUpdate,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<(String, SubscriptionConfig), Status>
update changes an existing subscription according to the fields set in updating. It returns the new SubscriptionConfig.
sourcepub async fn pull(
&self,
max_messages: i32,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<Vec<ReceivedMessage>, Status>
pub async fn pull(
&self,
max_messages: i32,
cancel: Option<CancellationToken>,
retry: Option<RetrySetting>
) -> Result<Vec<ReceivedMessage>, Status>
pull get message synchronously. It blocks until at least one message is available.
sourcepub async fn receive<F>(
&self,
f: impl Fn(ReceivedMessage, CancellationToken) -> F + Send + 'static + Sync + Clone,
cancel: CancellationToken,
config: Option<ReceiveConfig>
) -> Result<(), Status>where
F: Future<Output = ()> + Send + 'static,
pub async fn receive<F>(
&self,
f: impl Fn(ReceivedMessage, CancellationToken) -> F + Send + 'static + Sync + Clone,
cancel: CancellationToken,
config: Option<ReceiveConfig>
) -> Result<(), Status>where
F: Future<Output = ()> + Send + 'static,
receive calls f with the outstanding messages from the subscription. It blocks until cancellation token is cancelled, or the service returns a non-retryable error. The standard way to terminate a receive is to use CancellationToken.
sourcepub async fn ack(&self, ack_ids: Vec<String>) -> Result<(), Status>
pub async fn ack(&self, ack_ids: Vec<String>) -> Result<(), Status>
Ack acknowledges the messages associated with the ack_ids in the AcknowledgeRequest. The Pub/Sub system can remove the relevant messages from the subscription. This method is for batch acking.
use google_cloud_pubsub::client::Client;
use google_cloud_gax::cancel::CancellationToken;
use google_cloud_pubsub::subscription::Subscription;
use google_cloud_gax::grpc::Status;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Status> {
let mut client = Client::default().await.unwrap();
let subscription = client.subscription("test-subscription");
let ctx = CancellationToken::new();
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
let subscription_for_receive = subscription.clone();
let ctx_for_receive = ctx.clone();
let ctx_for_ack_manager = ctx.clone();
// receive
let handle = tokio::spawn(async move {
let _ = subscription_for_receive.receive(move |message, _ctx| {
let sender = sender.clone();
async move {
let _ = sender.send(message.ack_id().to_string());
}
}, ctx_for_receive.clone(), None).await;
});
// batch ack manager
let ack_manager = tokio::spawn( async move {
let mut ack_ids = Vec::new();
loop {
tokio::select! {
_ = ctx_for_ack_manager.cancelled() => {
return subscription.ack(ack_ids).await;
},
r = tokio::time::timeout(Duration::from_secs(10), receiver.recv()) => match r {
Ok(ack_id) => {
if let Some(ack_id) = ack_id {
ack_ids.push(ack_id);
if ack_ids.len() > 10 {
let _ = subscription.ack(ack_ids).await;
ack_ids = Vec::new();
}
}
},
Err(_e) => {
// timeout
let _ = subscription.ack(ack_ids).await;
ack_ids = Vec::new();
}
}
}
}
});
ctx.cancel();
Ok(())
}Trait Implementations
sourceimpl Clone for Subscription
impl Clone for Subscription
sourcefn clone(&self) -> Subscription
fn clone(&self) -> Subscription
1.0.0 · sourcefn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations
impl !RefUnwindSafe for Subscription
impl Send for Subscription
impl Sync for Subscription
impl Unpin for Subscription
impl !UnwindSafe for Subscription
Blanket Implementations
sourceimpl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
sourceimpl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
sourcefn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request