mod certs;
pub mod api {
include!("api/google.pubsub.v1.rs");
}
use anyhow::Result;
use api::{subscriber_client::SubscriberClient, AcknowledgeRequest, PullRequest, ReceivedMessage};
use google_auth::DefaultCredentials;
use log::debug;
use std::collections::HashMap;
use std::collections::VecDeque;
use tonic::{
metadata::MetadataValue,
transport::{Certificate, Channel, ClientTlsConfig},
Request,
};
const ENDPOINT: &str = "https://pubsub.googleapis.com";
pub struct Pubsub {
project_id: String,
}
async fn create_subscriber_client() -> Result<SubscriberClient<Channel>> {
let pem = certs::get_pem();
let tls_config = ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(&pem))
.domain_name("pubsub.googleapis.com");
let channel = Channel::from_static(ENDPOINT)
.tls_config(tls_config)?
.connect()
.await?;
let mut cred = DefaultCredentials::new();
let token = cred.token().await?;
let bearer_token = format!("Bearer {}", token.access_token);
let header_value = MetadataValue::from_str(&bearer_token)?;
let subscriber_client =
SubscriberClient::with_interceptor(channel, move |mut req: Request<()>| {
req.metadata_mut()
.insert("authorization", header_value.clone());
Ok(req)
});
Ok(subscriber_client)
}
pub struct Subscription {
project_id: String,
subscription_name: String,
client: SubscriberClient<Channel>,
buffer: VecDeque<ReceivedMessage>,
}
#[derive(Clone)]
pub struct Message {
pub(crate) client: SubscriberClient<Channel>,
pub(crate) data: Vec<u8>,
pub(crate) attributes: HashMap<String, String>,
pub(crate) ack_id: String,
pub(crate) message_id: String,
pub(crate) publish_time: chrono::NaiveDateTime,
pub(crate) subscription_name: String,
}
impl Message {
pub async fn ack(&mut self) -> Result<()> {
let req = AcknowledgeRequest {
subscription: self.subscription_name.clone(),
ack_ids: vec![self.ack_id.clone()],
};
self.client.acknowledge(req).await?;
Ok(())
}
pub fn data(&self) -> &[u8] {
self.data.as_slice()
}
pub fn id(&self) -> &str {
self.message_id.as_str()
}
pub fn attributes(&self) -> &HashMap<String, String> {
&self.attributes
}
}
impl Subscription {
pub async fn pull(&mut self, max_messages: i32) -> Result<Vec<ReceivedMessage>> {
let mut retry_counter: i32 = 0;
let max_retry = 10;
let response = loop {
if retry_counter >= max_retry {
debug!("retry exceed max_retry: {}", retry_counter);
return Err(anyhow::anyhow!("gg"));
}
let mut pr = PullRequest::default();
pr.subscription = format!(
"projects/{}/subscriptions/{}",
&self.project_id, &self.subscription_name
);
pr.max_messages = max_messages;
let result = self.client.pull(pr).await;
let res = match result {
Ok(res) => res.into_inner(),
Err(err) if err.code() == tonic::Code::Unauthenticated => {
retry_counter += 1;
debug!("unauthenticated: {}, retry ... #{}", err, retry_counter);
self.client = create_subscriber_client().await?;
continue;
}
Err(err) => {
retry_counter += 1;
debug!("error: {}, retry ... #{}", err, retry_counter);
continue;
}
};
break res;
};
Ok(response.received_messages)
}
pub async fn receive(&mut self) -> Option<Message> {
loop {
if let Some(handle) = self.buffer.pop_front() {
let pubsub_message = handle.message.unwrap();
let timestamp = pubsub_message.publish_time.unwrap();
let message = Message {
client: self.client.clone(),
subscription_name: self.subscription_name.clone(),
data: pubsub_message.data,
message_id: pubsub_message.message_id,
ack_id: handle.ack_id,
attributes: pubsub_message.attributes,
publish_time: chrono::NaiveDateTime::from_timestamp(
timestamp.seconds,
timestamp.nanos as u32,
),
};
break Some(message);
} else {
if let Ok(messages) = self.pull(50).await {
if messages.is_empty() {
break None;
}
self.buffer.extend(messages)
}
}
}
}
}
impl Pubsub {
pub fn new(project_id: &str) -> Self {
Self {
project_id: project_id.to_string(),
}
}
pub async fn subscription(&self, subscription_name: &str) -> Result<Subscription> {
Ok(Subscription {
project_id: self.project_id.clone(),
subscription_name: subscription_name.to_string(),
client: create_subscriber_client().await?,
buffer: VecDeque::new(),
})
}
pub async fn pull(
&self,
subscription_name: &str,
max_messages: i32,
) -> Result<Vec<ReceivedMessage>> {
let mut retry_counter: i32 = 0;
let max_retry = 10;
let mut subscriber_client = create_subscriber_client().await?;
let response = loop {
if retry_counter >= max_retry {
debug!("retry exceed max_retry: {}", retry_counter);
return Err(anyhow::anyhow!("gg"));
}
let mut pr = PullRequest::default();
pr.subscription = format!(
"projects/{}/subscriptions/{}",
&self.project_id, subscription_name
);
pr.max_messages = max_messages;
let result = subscriber_client.pull(pr).await;
let res = match result {
Ok(res) => res.into_inner(),
Err(err) if err.code() == tonic::Code::Unauthenticated => {
subscriber_client = create_subscriber_client().await?;
debug!("unauthenticated: {}", err);
retry_counter += 1;
continue;
}
Err(err) => {
debug!("error: {}", err);
retry_counter += 1;
continue;
}
};
break res;
};
Ok(response.received_messages)
}
pub async fn ack(&mut self, ack_ids: Vec<String>, subscription_name: &str) -> Result<()> {
let mut subscriber_client = create_subscriber_client().await?;
subscriber_client
.acknowledge(AcknowledgeRequest {
ack_ids,
subscription: format!(
"projects/{}/subscriptions/{}",
&self.project_id, subscription_name
),
})
.await?
.into_inner();
Ok(())
}
pub async fn create_subscription(
&mut self,
topic_name: &str,
subscription_name: &str,
filter: &str,
) -> Result<()> {
let mut subscriber_client = create_subscriber_client().await?;
let subs = api::Subscription {
name: format!(
"projects/{}/subscriptions/{}",
&self.project_id, subscription_name
),
ack_deadline_seconds: 10,
dead_letter_policy: None,
detached: false,
enable_message_ordering: true,
expiration_policy: None,
filter: filter.to_string(),
labels: HashMap::new(),
message_retention_duration: None,
push_config: None,
retain_acked_messages: false,
retry_policy: None,
topic: format!("projects/{}/topics/{}", &self.project_id, topic_name),
};
let resp = subscriber_client.create_subscription(subs).await?;
debug!("{:?}", resp);
Ok(())
}
pub async fn delete_subscription(&mut self, subscription_name: &str) -> Result<()> {
let mut subscriber_client = create_subscriber_client().await?;
let req = api::DeleteSubscriptionRequest {
subscription: format!(
"projects/{}/subscriptions/{}",
&self.project_id, subscription_name
),
};
let resp = subscriber_client.delete_subscription(req).await?;
debug!("{:?}", resp);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_works() {
let project_id = "test-123";
Pubsub::new(project_id);
}
}