use anyhow::anyhow;
use futures::channel::oneshot;
use futures::future::{self, AbortHandle};
use futures::prelude::*;
use logimesh::server::{self, Channel};
use logimesh::tokio_serde::formats::Json;
use logimesh::transport::tcp;
use logimesh::{client, context};
use opentelemetry::trace::TracerProvider as _;
use publisher::Publisher as _;
use std::collections::HashMap;
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex, RwLock};
use subscriber::Subscriber as _;
use tokio::net::ToSocketAddrs;
use tracing::info;
use tracing_subscriber::prelude::*;
pub mod subscriber {
#[logimesh::component]
pub trait Subscriber {
async fn topics() -> Vec<String>;
async fn receive(topic: String, message: String);
}
}
pub mod publisher {
#[logimesh::component]
pub trait Publisher {
async fn publish(topic: String, message: String);
}
}
#[derive(Clone, Debug)]
struct Subscriber {
local_addr: SocketAddr,
topics: Vec<String>,
}
impl subscriber::Subscriber for Subscriber {
async fn topics(self, _: context::Context) -> Vec<String> {
self.topics.clone()
}
async fn receive(self, _: context::Context, topic: String, message: String) {
info!(local_addr = %self.local_addr, %topic, %message, "ReceivedMessage")
}
}
struct SubscriberHandle(AbortHandle);
impl Drop for SubscriberHandle {
fn drop(&mut self) {
self.0.abort();
}
}
impl Subscriber {
async fn connect(publisher_addr: impl ToSocketAddrs, topics: Vec<String>) -> anyhow::Result<SubscriberHandle> {
let publisher = tcp::connect(publisher_addr, Json::default).await?;
let local_addr = publisher.local_addr()?;
let mut handler = server::BaseChannel::with_defaults(publisher).requests();
let subscriber = Subscriber { local_addr, topics };
match handler.next().await {
Some(init_topics) => init_topics?.execute(subscriber.clone().logimesh_serve()).await,
None => return Err(anyhow!("[{}] Server never initialized the subscriber.", local_addr)),
};
let (handler, abort_handle) = future::abortable(handler.execute(subscriber.logimesh_serve()).for_each(spawn));
tokio::spawn(async move {
match handler.await {
Ok(()) | Err(future::Aborted) => info!(?local_addr, "subscriber shutdown."),
}
});
Ok(SubscriberHandle(abort_handle))
}
}
#[derive(Debug)]
struct Subscription {
topics: Vec<String>,
}
#[derive(Clone, Debug)]
struct Publisher {
clients: Arc<Mutex<HashMap<SocketAddr, Subscription>>>,
subscriptions: Arc<RwLock<HashMap<String, HashMap<SocketAddr, subscriber::SubscriberClient>>>>,
}
struct PublisherAddrs {
publisher: SocketAddr,
subscriptions: SocketAddr,
}
async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
tokio::spawn(fut);
}
impl Publisher {
async fn start(self) -> io::Result<PublisherAddrs> {
let mut connecting_publishers = tcp::listen("localhost:0", Json::default).await?;
let publisher_addrs = PublisherAddrs {
publisher: connecting_publishers.local_addr(),
subscriptions: self.clone().start_subscription_manager().await?,
};
info!(publisher_addr = %publisher_addrs.publisher, "listening for publishers.",);
tokio::spawn(async move {
let publisher = connecting_publishers.next().await.unwrap().unwrap();
info!(publisher.peer_addr = ?publisher.peer_addr(), "publisher connected.");
server::BaseChannel::with_defaults(publisher).execute(self.logimesh_serve()).for_each(spawn).await
});
Ok(publisher_addrs)
}
async fn start_subscription_manager(mut self) -> io::Result<SocketAddr> {
let mut connecting_subscribers = tcp::listen("localhost:0", Json::default).await?.filter_map(|r| future::ready(r.ok()));
let new_subscriber_addr = connecting_subscribers.get_ref().local_addr();
info!(?new_subscriber_addr, "listening for subscribers.");
tokio::spawn(async move {
while let Some(conn) = connecting_subscribers.next().await {
let subscriber_addr = conn.peer_addr().unwrap();
let logimesh::client::core::NewClient { client: subscriber, dispatch } = subscriber::SubscriberClient::new(client::core::Config::default(), conn);
let (ready_tx, ready) = oneshot::channel();
self.clone().start_subscriber_gc(subscriber_addr, dispatch, ready);
self.initialize_subscription(subscriber_addr, subscriber).await;
ready_tx.send(()).unwrap();
}
});
Ok(new_subscriber_addr)
}
async fn initialize_subscription(&mut self, subscriber_addr: SocketAddr, subscriber: subscriber::SubscriberClient) {
if let Ok(topics) = subscriber.topics(context::current()).await {
self.clients.lock().unwrap().insert(subscriber_addr, Subscription { topics: topics.clone() });
info!(%subscriber_addr, ?topics, "subscribed to new topics");
let mut subscriptions = self.subscriptions.write().unwrap();
for topic in topics {
subscriptions.entry(topic).or_default().insert(subscriber_addr, subscriber.clone());
}
}
}
fn start_subscriber_gc<E: Error>(self, subscriber_addr: SocketAddr, client_dispatch: impl Future<Output = Result<(), E>> + Send + 'static, subscriber_ready: oneshot::Receiver<()>) {
tokio::spawn(async move {
if let Err(e) = client_dispatch.await {
info!(
%subscriber_addr,
error = %e,
"subscriber connection broken");
}
let _ = subscriber_ready.await;
if let Some(subscription) = self.clients.lock().unwrap().remove(&subscriber_addr) {
info!("[{} unsubscribing from topics: {:?}", subscriber_addr, subscription.topics);
let mut subscriptions = self.subscriptions.write().unwrap();
for topic in subscription.topics {
let subscribers = subscriptions.get_mut(&topic).unwrap();
subscribers.remove(&subscriber_addr);
if subscribers.is_empty() {
subscriptions.remove(&topic);
}
}
}
});
}
}
impl publisher::Publisher for Publisher {
async fn publish(self, _: context::Context, topic: String, message: String) {
info!("received message to publish.");
let mut subscribers = match self.subscriptions.read().unwrap().get(&topic) {
None => return,
Some(subscriptions) => subscriptions.clone(),
};
let mut publications = Vec::new();
for client in subscribers.values_mut() {
publications.push(client.receive(context::current(), topic.clone(), message.clone()));
}
for response in future::join_all(publications).await {
if let Err(e) = response {
info!("failed to broadcast to subscriber: {}", e);
}
}
}
}
pub fn init_tracing(service_name: &'static str) -> anyhow::Result<()> {
let tracer_provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_batch_config(opentelemetry_sdk::trace::BatchConfig::default())
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.with_trace_config(
opentelemetry_sdk::trace::Config::default().with_resource(opentelemetry_sdk::Resource::new([opentelemetry::KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name,
)])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
let tracer = tracer_provider.tracer(service_name);
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer())
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()?;
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_tracing("Pub/Sub")?;
let addrs = Publisher {
clients: Arc::new(Mutex::new(HashMap::new())),
subscriptions: Arc::new(RwLock::new(HashMap::new())),
}
.start()
.await?;
let _subscriber0 = Subscriber::connect(addrs.subscriptions, vec!["calculus".into(), "cool shorts".into()]).await?;
let _subscriber1 = Subscriber::connect(addrs.subscriptions, vec!["cool shorts".into(), "history".into()]).await?;
let publisher = publisher::PublisherClient::new(client::core::Config::default(), tcp::connect(addrs.publisher, Json::default).await?).spawn();
publisher.publish(context::current(), "calculus".into(), "sqrt(2)".into()).await?;
publisher.publish(context::current(), "cool shorts".into(), "hello to all".into()).await?;
publisher.publish(context::current(), "history".into(), "napoleon".to_string()).await?;
drop(_subscriber0);
publisher.publish(context::current(), "cool shorts".into(), "hello to who?".into()).await?;
opentelemetry::global::shutdown_tracer_provider();
info!("done.");
Ok(())
}