use crate::api::{PullRequest, PushConfig, ReceivedMessage, Subscription as APISubscription};
use crate::client::Client;
use crate::error::Error;
use crate::message::Message;
use std::collections::VecDeque;
use tracing::error;
pub struct Subscription {
pub(crate) name: String,
pub(crate) client: Client,
pub(crate) buffer: VecDeque<ReceivedMessage>,
}
impl Subscription {
pub(crate) fn new(client: Client, name: impl Into<String>) -> Self {
Self {
client,
name: name.into(),
buffer: VecDeque::new(),
}
}
async fn pull(&mut self, max_messages: i32) -> Result<Vec<ReceivedMessage>, Error> {
let mut pr = PullRequest::default();
pr.subscription = self.name.clone();
pr.max_messages = max_messages;
let req = self.client.insert_authz_token(pr).await?;
let response = self.client.subscriber.pull(req).await?.into_inner();
Ok(response.received_messages)
}
pub async fn receive(&mut self) -> Option<Message> {
loop {
if let Some(handle) = self.buffer.pop_front() {
let pubsub_message = match handle.message {
Some(m) => m,
None => continue,
};
let timestamp = match pubsub_message.publish_time {
Some(t) => t,
None => continue,
};
let message = Message {
client: self.client.clone(),
subscription_name: self.name.clone(),
data: pubsub_message.data,
message_id: pubsub_message.message_id,
ack_id: handle.ack_id,
attributes: pubsub_message.attributes,
publish_time: chrono::NaiveDateTime::from_timestamp(
timestamp.seconds,
timestamp.nanos as u32,
),
};
break Some(message);
} else {
match self.pull(50).await {
Ok(messages) => {
if messages.is_empty() {
break None;
}
self.buffer.extend(messages)
}
Err(err) => {
error!("pull message failed{}", err);
continue;
}
};
}
}
}
}