use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use crate::horizontal_adapter::{
BroadcastMessage, HorizontalAdapter, RequestBody, RequestType, ResponseBody,
generate_request_id,
};
use async_trait::async_trait;
use sockudo_core::error::Result;
use sockudo_core::metrics::MetricsInterface;
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub struct TransportHandlers {
pub node_id: String,
pub on_broadcast: Arc<dyn Fn(BroadcastMessage) -> BoxFuture<'static, ()> + Send + Sync>,
pub on_request:
Arc<dyn Fn(RequestBody) -> BoxFuture<'static, Result<ResponseBody>> + Send + Sync>,
pub on_response: Arc<dyn Fn(ResponseBody) -> BoxFuture<'static, ()> + Send + Sync>,
}
#[async_trait]
pub trait HorizontalTransport: Send + Sync + Clone {
type Config: Send + Sync;
async fn new(config: Self::Config) -> Result<Self>;
async fn publish_broadcast(&self, message: &BroadcastMessage) -> Result<()>;
async fn publish_request(&self, request: &RequestBody) -> Result<()>;
async fn publish_response(&self, response: &ResponseBody) -> Result<()>;
async fn start_listeners(&self, handlers: TransportHandlers) -> Result<()>;
async fn get_node_count(&self) -> Result<usize>;
fn node_count_is_real_time(&self) -> bool {
false
}
async fn check_health(&self) -> Result<()>;
fn set_metrics(&self, _metrics: Arc<dyn MetricsInterface + Send + Sync>) {}
fn new_inbox(&self) -> Option<String> {
None
}
async fn publish_request_with_reply(
&self,
request: &RequestBody,
_reply_to: &str,
) -> Result<()> {
self.publish_request(request).await
}
async fn publish_request_to_node(
&self,
request: &RequestBody,
_target_node_id: &str,
) -> Result<()> {
self.publish_request(request).await
}
async fn sync_presence_state_to_node(
&self,
horizontal: &Arc<HorizontalAdapter>,
target_node_id: &str,
) -> Result<()> {
send_presence_state_to_node(self, horizontal, target_node_id).await
}
}
pub(crate) async fn send_presence_state_to_node<T: HorizontalTransport>(
transport: &T,
horizontal: &Arc<HorizontalAdapter>,
target_node_id: &str,
) -> Result<()> {
let (our_node_id, payload) = {
let registry = horizontal.cluster_presence_registry.read().await;
let Some(our_data) = registry.get(&horizontal.node_id) else {
tracing::debug!("No presence data to send to new node: {}", target_node_id);
return Ok(());
};
if our_data.is_empty() {
tracing::debug!("Empty presence data for new node: {}", target_node_id);
return Ok(());
};
(horizontal.node_id.clone(), sonic_rs::to_value(our_data)?)
};
let request = RequestBody {
request_id: generate_request_id(),
node_id: our_node_id,
app_id: "cluster".to_string(),
request_type: RequestType::PresenceStateSync,
target_node_id: Some(target_node_id.to_string()),
user_info: Some(payload), channel: None,
socket_id: None,
user_id: None,
timestamp: None,
dead_node_id: None,
channels: None,
};
transport
.publish_request_to_node(&request, target_node_id)
.await?;
tracing::info!(
"Sent presence state to new node: {} (single message)",
target_node_id
);
Ok(())
}
pub trait TransportConfig: Send + Sync + Clone {
fn request_timeout_ms(&self) -> u64;
fn prefix(&self) -> &str;
}