use crate::{EncodableMessage, Topic, ValidatedMessage};
use futures_util::{
ready,
sink::{Sink, SinkExt},
};
use pin_project::pin_project;
use std::{
collections::{BTreeMap, VecDeque},
error::Error as StdError,
fmt,
pin::Pin,
task::{Context, Poll},
time::SystemTime,
};
use ya_gcp::pubsub;
use super::{
retry_policy::{
exponential_backoff::Config as ExponentialBackoffConfig, ExponentialBackoff,
RetryOperation, RetryPolicy,
},
BoxError, Connect, DefaultConnector, MakeConnection, PubSubError, TopicName, Uri,
};
use message_translate::{TopicSink, TopicSinkError};
#[derive(Debug)]
struct Shared<T>(std::sync::Arc<parking_lot::Mutex<T>>);
impl<T> Shared<T> {
fn new(t: T) -> Self {
Self(std::sync::Arc::new(parking_lot::Mutex::new(t)))
}
fn borrow_mut(&self) -> impl std::ops::DerefMut<Target = T> + '_ {
self.0
.try_lock()
.unwrap_or_else(|| panic!("unexpected overlapping borrow of shared state"))
}
}
impl<T> Clone for Shared<T> {
fn clone(&self) -> Self {
Self(std::sync::Arc::clone(&self.0))
}
}
#[derive(Debug, Clone)]
pub struct PublisherClient<C = DefaultConnector> {
client: pubsub::PublisherClient<C>,
project: String,
identifier: String,
}
impl<C> PublisherClient<C> {
pub(super) fn new(
client: pubsub::PublisherClient<C>,
project: String,
identifier: String,
) -> Self {
PublisherClient {
client,
project,
identifier,
}
}
fn project(&self) -> &str {
&self.project
}
fn identifier(&self) -> &str {
&self.identifier
}
}
#[derive(Debug)]
pub enum PublishError<M: EncodableMessage, E> {
Publish {
cause: PubSubError,
messages: Vec<M>,
},
Response(E),
InvalidMessage {
cause: M::Error,
message: M,
},
}
impl<M: EncodableMessage, E> fmt::Display for PublishError<M, E>
where
M::Error: fmt::Display,
E: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PublishError::Publish { messages, .. } => f.write_fmt(format_args!(
"could not publish {} messages",
messages.len()
)),
PublishError::Response(..) => f.write_str(
"could not forward response for a successfully published message to the sink",
),
PublishError::InvalidMessage { .. } => f.write_str("could not validate message"),
}
}
}
impl<M: EncodableMessage, E> StdError for PublishError<M, E>
where
M: fmt::Debug,
M::Error: StdError + 'static,
E: StdError + 'static,
{
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
PublishError::Publish { cause, .. } => Some(cause as &_),
PublishError::Response(cause) => Some(cause as &_),
PublishError::InvalidMessage { cause, .. } => Some(cause as &_),
}
}
}
impl<M: EncodableMessage, E> From<TopicSinkError<M, E>> for PublishError<M, E> {
fn from(from: TopicSinkError<M, E>) -> Self {
match from {
TopicSinkError::Publish(cause, messages) => PublishError::Publish { cause, messages },
TopicSinkError::Response(err) => PublishError::Response(err),
}
}
}
impl<C> PublisherClient<C>
where
C: MakeConnection<Uri> + ya_gcp::Connect + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send + 'static,
C::Future: Send + 'static,
BoxError: From<C::Error>,
{
pub async fn create_topic(&mut self, topic: TopicConfig<'_>) -> Result<(), PubSubError> {
let topic = topic.into_topic(self);
self.client.create_topic(topic).await?;
Ok(())
}
pub async fn delete_topic(&mut self, topic: TopicName<'_>) -> Result<(), PubSubError> {
let topic = topic.into_project_topic_name(self.project()).into();
self.client
.delete_topic(pubsub::api::DeleteTopicRequest { topic })
.await?;
Ok(())
}
pub fn publisher(&self) -> Publisher<C> {
Publisher {
client: self.clone(),
retry_policy: ExponentialBackoff::new(
pubsub::PubSubRetryCheck::default(),
ExponentialBackoffConfig::default(),
),
}
}
}
pub struct Publisher<C, R = ExponentialBackoff<pubsub::PubSubRetryCheck>> {
client: PublisherClient<C>,
retry_policy: R,
}
impl<C, OldR> Publisher<C, OldR> {
pub fn with_retry_policy<R, M>(self, retry_policy: R) -> Publisher<C, R>
where
R: RetryPolicy<[M], PubSubError> + Clone,
M: EncodableMessage,
{
Publisher {
retry_policy,
client: self.client,
}
}
}
impl<C, M, S, R> crate::publisher::Publisher<M, S> for Publisher<C, R>
where
C: Connect + Clone + Send + Sync + 'static,
M: EncodableMessage + Send + 'static,
S: Sink<M> + Send + 'static,
R: RetryPolicy<[M], PubSubError> + Clone + 'static,
R::RetryOp: Send + 'static,
<R::RetryOp as RetryOperation<[M], PubSubError>>::Sleep: Send + 'static,
{
type PublishError = PublishError<M, S::Error>;
type PublishSink = PublishSink<C, M, S, R>;
fn publish_sink_with_responses(
self,
validator: M::Validator,
response_sink: S,
) -> Self::PublishSink {
PublishSink {
topic_sinks: BTreeMap::new(),
validator,
buffer: None,
client: self.client,
retry_policy: self.retry_policy,
response_sink: Shared::new(Box::pin(response_sink)),
_p: std::marker::PhantomPinned,
}
}
}
match_fields! {
pubsub::api::Topic =>
#[derive(Debug, Clone)]
pub struct TopicConfig<'s> {
pub name: TopicName<'s>,
pub labels: std::collections::HashMap<String, String>,
pub message_storage_policy: Option<pubsub::api::MessageStoragePolicy>,
pub kms_key_name: String,
pub message_retention_duration: Option<pubsub::api::Duration>,
@except:
schema_settings,
satisfies_pzs,
}
}
impl<'s> TopicConfig<'s> {
fn into_topic<C>(self, client: &PublisherClient<C>) -> pubsub::api::Topic {
pubsub::api::Topic {
name: self.name.into_project_topic_name(client.project()).into(),
labels: self.labels,
message_storage_policy: self.message_storage_policy,
kms_key_name: self.kms_key_name,
message_retention_duration: self.message_retention_duration,
schema_settings: None, satisfies_pzs: false, }
}
}
impl<'s> Default for TopicConfig<'s> {
fn default() -> Self {
Self {
name: TopicName::new(String::new()),
labels: std::collections::HashMap::new(),
message_storage_policy: None,
kms_key_name: String::new(),
message_retention_duration: None,
}
}
}
#[pin_project]
pub struct PublishSink<C, M: EncodableMessage, S: Sink<M>, R> {
#[allow(clippy::type_complexity)] topic_sinks: BTreeMap<Topic, Pin<Box<TopicSink<C, M, S, R>>>>,
validator: M::Validator,
buffer: Option<M>,
client: PublisherClient<C>,
retry_policy: R,
response_sink: Shared<Pin<Box<S>>>,
_p: std::marker::PhantomPinned,
}
impl<C, M, S, R> Sink<M> for PublishSink<C, M, S, R>
where
C: Connect + Clone + Send + Sync + 'static,
M: EncodableMessage + Send + 'static,
S: Sink<M> + Send + 'static,
R: RetryPolicy<[M], PubSubError> + Clone + 'static,
R::RetryOp: Send + 'static,
<R::RetryOp as RetryOperation<[M], PubSubError>>::Sleep: Send + 'static,
{
type Error = PublishError<M, S::Error>;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let client = this.client;
match this.buffer.as_ref() {
Some(msg) => {
let topic = msg.topic();
let sink = {
let retry_policy = this.retry_policy;
let response_sink = this.response_sink;
this.topic_sinks.entry(topic.clone()).or_insert_with(|| {
Box::pin(TopicSink::new(
client.client.publish_topic_sink(
TopicName::new(topic.as_ref())
.into_project_topic_name(client.project()),
),
retry_policy.clone(),
Shared::clone(response_sink),
))
})
};
ready!(sink.poll_ready_unpin(cx))?;
let message = this.buffer.take().expect("already check Some");
let validated = match message.encode(this.validator) {
Ok(validated_msg) => validated_msg,
Err(err) => {
return Poll::Ready(Err(PublishError::InvalidMessage {
cause: err,
message,
}))
}
};
let api_message = match hedwig_to_pubsub(validated, client.identifier()) {
Ok(api_message) => api_message,
Err(err) => {
return Poll::Ready(Err(PublishError::Publish {
cause: err,
messages: vec![message],
}))
}
};
sink.start_send_unpin((message, api_message))?;
Poll::Ready(Ok(()))
}
None => Poll::Ready(Ok(())),
}
}
fn start_send(self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
if self.project().buffer.replace(item).is_some() {
panic!("each `start_send` must be preceded by a successful call to `poll_ready`")
}
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_ready(cx))?;
let mut all_ready = true;
for sink in self.topic_sinks.values_mut() {
all_ready &= sink.poll_flush_unpin(cx)?.is_ready();
}
if all_ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush(cx))?;
let mut all_ready = true;
for sink in self.topic_sinks.values_mut() {
all_ready &= sink.poll_close_unpin(cx)?.is_ready();
}
if all_ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}
fn hedwig_to_pubsub(
mut msg: ValidatedMessage,
publisher_id: &str,
) -> Result<pubsub::api::PubsubMessage, PubSubError> {
let mut attributes = std::mem::take(msg.headers_mut());
if let Some(invalid_key) = attributes.keys().find(|key| key.starts_with("hedwig_")) {
return Err(PubSubError::invalid_argument(format!(
"keys starting with \"hedwig_\" are reserved: {}",
invalid_key
)));
}
attributes.insert(crate::HEDWIG_ID.into(), msg.uuid().to_string());
attributes.insert(
crate::HEDWIG_MESSAGE_TIMESTAMP.into(),
msg.timestamp()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|_| {
PubSubError::invalid_argument(format!(
"timestamp should be after UNIX epoch: {:?}",
msg.timestamp()
))
})?
.as_millis()
.to_string(),
);
attributes.insert(crate::HEDWIG_SCHEMA.into(), msg.schema().into());
attributes.insert(crate::HEDWIG_PUBLISHER.into(), publisher_id.into());
attributes.insert(crate::HEDWIG_FORMAT_VERSION.into(), "1.0".into());
Ok(pubsub::api::PubsubMessage {
data: msg.into_data(),
attributes,
..pubsub::api::PubsubMessage::default()
})
}
mod message_translate {
use super::*;
struct TranslateBuffer<M> {
buf: VecDeque<M>,
}
impl<M> TranslateBuffer<M> {
const PUBLISH_BUFFER_SIZE: usize = 1000;
fn new() -> Self {
Self {
buf: VecDeque::with_capacity(Self::PUBLISH_BUFFER_SIZE),
}
}
fn add_message(&mut self, user_message: M) {
self.buf.push_back(user_message)
}
fn remove_success(&mut self, _api_message: pubsub::api::PubsubMessage) -> M {
self.buf
.pop_front()
.expect("translate buffer should be in sync with publish buffer")
}
fn remove_errors(
&mut self,
error: pubsub::PublishError,
) -> (PubSubError, impl Iterator<Item = M> + '_) {
(error.source, self.buf.drain(0..error.messages.len()))
}
fn view_messages(&mut self, api_messages: &[pubsub::api::PubsubMessage]) -> &[M] {
&self.buf.make_contiguous()[0..api_messages.len()]
}
}
#[pin_project]
pub(super) struct TopicSink<C, M, S: Sink<M>, R> {
user_messages: Shared<TranslateBuffer<M>>,
#[pin]
pubsub_sink: pubsub::PublishTopicSink<C, TranslateRetryPolicy<M, R>, TranslateSink<M, S>>,
}
pub(super) enum TopicSinkError<M, E> {
Publish(PubSubError, Vec<M>),
Response(E),
}
impl<C, M, S: Sink<M>, R> TopicSink<C, M, S, R>
where
S: Sink<M>,
R: RetryPolicy<[M], PubSubError>,
{
pub(super) fn new(
pubsub_sink: pubsub::PublishTopicSink<C>,
retry_policy: R,
response_sink: Shared<Pin<Box<S>>>,
) -> Self {
let user_messages = Shared::new(TranslateBuffer::new());
Self {
user_messages: Shared::clone(&user_messages),
pubsub_sink: pubsub_sink
.with_retry_policy(TranslateRetryPolicy {
user_messages: Shared::clone(&user_messages),
user_retry: retry_policy,
})
.with_response_sink(TranslateSink {
user_messages,
user_sink: response_sink,
}),
}
}
fn translate_poll_fn<F>(
self: Pin<&mut Self>,
poll_fn: F,
cx: &mut Context,
) -> Poll<Result<(), TopicSinkError<M, S::Error>>>
where
F: FnOnce(
Pin<
&mut pubsub::PublishTopicSink<
C,
TranslateRetryPolicy<M, R>,
TranslateSink<M, S>,
>,
>,
&mut Context,
) -> Poll<Result<(), pubsub::SinkError<S::Error>>>,
{
let this = self.project();
let user_messages = this.user_messages;
poll_fn(this.pubsub_sink, cx).map_err(|err| match err {
pubsub::SinkError::Publish(publish_error) => {
let mut user_messages = user_messages.borrow_mut();
let (source, messages) = user_messages.remove_errors(publish_error);
TopicSinkError::Publish(source, messages.collect())
}
pubsub::SinkError::Response(response_error) => {
TopicSinkError::Response(response_error)
}
})
}
}
impl<C, M, S: Sink<M>, R> Sink<(M, pubsub::api::PubsubMessage)> for TopicSink<C, M, S, R>
where
C: Connect + Clone + Send + Sync + 'static,
R: RetryPolicy<[M], PubSubError> + 'static,
R::RetryOp: Send + 'static,
<R::RetryOp as RetryOperation<[M], PubSubError>>::Sleep: Send + 'static,
S: Sink<M> + Send + 'static,
M: EncodableMessage + Send + 'static,
{
type Error = TopicSinkError<M, S::Error>;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.translate_poll_fn(pubsub::PublishTopicSink::poll_ready, cx)
}
fn start_send(
self: Pin<&mut Self>,
(user_message, api_message): (M, pubsub::api::PubsubMessage),
) -> Result<(), Self::Error> {
let this = self.project();
match this.pubsub_sink.start_send(api_message) {
Ok(()) => {
this.user_messages.borrow_mut().add_message(user_message);
Ok(())
}
Err(err) => Err(match err {
pubsub::SinkError::Publish(publish_error) => {
assert_eq!(publish_error.messages.len(), 1);
TopicSinkError::Publish(publish_error.source, vec![user_message])
}
pubsub::SinkError::Response(_) => {
unreachable!("response sink should not be used in start_send")
}
}),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.translate_poll_fn(pubsub::PublishTopicSink::poll_flush, cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.translate_poll_fn(pubsub::PublishTopicSink::poll_close, cx)
}
}
struct TranslateRetryPolicy<M, R> {
user_messages: Shared<TranslateBuffer<M>>,
user_retry: R,
}
impl<M, R> RetryPolicy<pubsub::api::PublishRequest, PubSubError> for TranslateRetryPolicy<M, R>
where
R: RetryPolicy<[M], PubSubError>,
{
type RetryOp = TranslateRetryOp<M, R::RetryOp>;
fn new_operation(&mut self) -> Self::RetryOp {
TranslateRetryOp {
user_messages: Shared::clone(&self.user_messages),
user_retry_op: self.user_retry.new_operation(),
}
}
}
struct TranslateRetryOp<M, O> {
user_messages: Shared<TranslateBuffer<M>>,
user_retry_op: O,
}
impl<M, O> RetryOperation<pubsub::api::PublishRequest, PubSubError> for TranslateRetryOp<M, O>
where
O: RetryOperation<[M], PubSubError>,
{
type Sleep = O::Sleep;
fn check_retry(
&mut self,
failed_value: &pubsub::api::PublishRequest,
error: &PubSubError,
) -> Option<Self::Sleep> {
let mut user_messages = self.user_messages.borrow_mut();
let failed_messages = user_messages.view_messages(&failed_value.messages);
self.user_retry_op.check_retry(failed_messages, error)
}
}
struct TranslateSink<M, S: Sink<M>> {
user_messages: Shared<TranslateBuffer<M>>,
user_sink: Shared<Pin<Box<S>>>,
}
impl<M, S> Sink<pubsub::api::PubsubMessage> for TranslateSink<M, S>
where
S: Sink<M>,
{
type Error = S::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.user_sink.borrow_mut().poll_ready_unpin(cx)
}
fn start_send(
self: Pin<&mut Self>,
api_message: pubsub::api::PubsubMessage,
) -> Result<(), Self::Error> {
let user_message = self.user_messages.borrow_mut().remove_success(api_message);
self.user_sink.borrow_mut().start_send_unpin(user_message)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.user_sink.borrow_mut().poll_flush_unpin(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.user_sink.borrow_mut().poll_close_unpin(cx)
}
}
}