use std::sync::Arc;
use std::time::Duration;
use mcpr_core::event::{EventSink, ProxyEvent};
use tokio::sync::mpsc;
pub type SyncCallback = Arc<dyn Fn(SyncStatus) + Send + Sync>;
pub enum SyncStatus {
Ok { count: usize },
Failed { message: String },
}
pub struct CloudSinkConfig {
pub endpoint: String,
pub token: String,
pub server: Option<String>,
pub batch_size: usize,
pub flush_interval: Duration,
pub on_flush: Option<SyncCallback>,
}
pub struct CloudSink {
tx: mpsc::Sender<ProxyEvent>,
}
impl CloudSink {
pub fn new(config: CloudSinkConfig) -> Self {
let (tx, rx) = mpsc::channel::<ProxyEvent>(1000);
tokio::spawn(flush_loop(rx, config));
Self { tx }
}
}
impl EventSink for CloudSink {
fn on_event(&self, event: &ProxyEvent) {
let _ = self.tx.try_send(event.clone());
}
fn name(&self) -> &'static str {
"cloud"
}
}
async fn flush_loop(mut rx: mpsc::Receiver<ProxyEvent>, config: CloudSinkConfig) {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.unwrap();
let mut buffer: Vec<ProxyEvent> = Vec::with_capacity(config.batch_size);
let mut interval = tokio::time::interval(config.flush_interval);
loop {
tokio::select! {
msg = rx.recv() => {
let Some(event) = msg else {
if !buffer.is_empty() {
flush_batch(&client, &config, &mut buffer).await;
}
break;
};
buffer.push(event);
if buffer.len() >= config.batch_size {
flush_batch(&client, &config, &mut buffer).await;
}
}
_ = interval.tick() => {
if !buffer.is_empty() {
flush_batch(&client, &config, &mut buffer).await;
}
}
}
}
}
async fn flush_batch(
client: &reqwest::Client,
config: &CloudSinkConfig,
buffer: &mut Vec<ProxyEvent>,
) {
let events = std::mem::take(buffer);
let payload: Vec<serde_json::Value> = events
.iter()
.map(|e| {
let mut val = serde_json::to_value(e).unwrap_or(serde_json::Value::Null);
if let Some(ref server) = config.server
&& let Some(obj) = val.as_object_mut()
{
obj.entry("server")
.or_insert(serde_json::Value::String(server.clone()));
}
val
})
.collect();
let body = match serde_json::to_vec(&payload) {
Ok(b) => b,
Err(_) => return,
};
for attempt in 0..3u32 {
match client
.post(&config.endpoint)
.header("Authorization", format!("Bearer {}", config.token))
.header("Content-Type", "application/json")
.body(body.clone())
.send()
.await
{
Ok(resp) if matches!(resp.status().as_u16(), 200 | 202) => {
if let Some(ref cb) = config.on_flush {
cb(SyncStatus::Ok {
count: events.len(),
});
}
return;
}
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
if let Some(ref cb) = config.on_flush {
cb(SyncStatus::Failed {
message: format!("HTTP {status} — {body}"),
});
}
}
Err(e) => {
if let Some(ref cb) = config.on_flush {
cb(SyncStatus::Failed {
message: e.to_string(),
});
}
}
}
tokio::time::sleep(Duration::from_secs(1 << attempt)).await;
}
if let Some(ref cb) = config.on_flush {
cb(SyncStatus::Failed {
message: format!("dropped {} events after 3 retries", events.len()),
});
}
}