mcpr_integrations/emitter/
cloud_sink.rs1use std::sync::Arc;
7use std::time::Duration;
8
9use mcpr_core::event::{EventSink, ProxyEvent};
10use tokio::sync::mpsc;
11
12pub type SyncCallback = Arc<dyn Fn(SyncStatus) + Send + Sync>;
14
15pub enum SyncStatus {
17 Ok { count: usize },
18 Failed { message: String },
19}
20
21pub struct CloudSinkConfig {
23 pub endpoint: String,
25 pub token: String,
27 pub server: Option<String>,
29 pub batch_size: usize,
31 pub flush_interval: Duration,
33 pub on_flush: Option<SyncCallback>,
35}
36
37pub struct CloudSink {
42 tx: mpsc::Sender<ProxyEvent>,
43}
44
45impl CloudSink {
46 pub fn new(config: CloudSinkConfig) -> Self {
47 let (tx, rx) = mpsc::channel::<ProxyEvent>(1000);
48 tokio::spawn(flush_loop(rx, config));
49 Self { tx }
50 }
51}
52
53impl EventSink for CloudSink {
54 fn on_event(&self, event: &ProxyEvent) {
55 let _ = self.tx.try_send(event.clone());
57 }
58
59 fn name(&self) -> &'static str {
60 "cloud"
61 }
62}
63
64async fn flush_loop(mut rx: mpsc::Receiver<ProxyEvent>, config: CloudSinkConfig) {
65 let client = reqwest::Client::builder()
66 .timeout(Duration::from_secs(10))
67 .build()
68 .unwrap();
69
70 let mut buffer: Vec<ProxyEvent> = Vec::with_capacity(config.batch_size);
71 let mut interval = tokio::time::interval(config.flush_interval);
72
73 loop {
74 tokio::select! {
75 msg = rx.recv() => {
76 let Some(event) = msg else {
77 if !buffer.is_empty() {
79 flush_batch(&client, &config, &mut buffer).await;
80 }
81 break;
82 };
83
84 buffer.push(event);
85
86 if buffer.len() >= config.batch_size {
87 flush_batch(&client, &config, &mut buffer).await;
88 }
89 }
90 _ = interval.tick() => {
91 if !buffer.is_empty() {
92 flush_batch(&client, &config, &mut buffer).await;
93 }
94 }
95 }
96 }
97}
98
99async fn flush_batch(
100 client: &reqwest::Client,
101 config: &CloudSinkConfig,
102 buffer: &mut Vec<ProxyEvent>,
103) {
104 let events = std::mem::take(buffer);
105
106 let payload: Vec<serde_json::Value> = events
109 .iter()
110 .map(|e| {
111 let mut val = serde_json::to_value(e).unwrap_or(serde_json::Value::Null);
112 if let Some(ref server) = config.server
113 && let Some(obj) = val.as_object_mut()
114 {
115 obj.entry("server")
116 .or_insert(serde_json::Value::String(server.clone()));
117 }
118 val
119 })
120 .collect();
121
122 let body = match serde_json::to_vec(&payload) {
123 Ok(b) => b,
124 Err(_) => return,
125 };
126
127 for attempt in 0..3u32 {
129 match client
130 .post(&config.endpoint)
131 .header("Authorization", format!("Bearer {}", config.token))
132 .header("Content-Type", "application/json")
133 .body(body.clone())
134 .send()
135 .await
136 {
137 Ok(resp) if matches!(resp.status().as_u16(), 200 | 202) => {
138 if let Some(ref cb) = config.on_flush {
139 cb(SyncStatus::Ok {
140 count: events.len(),
141 });
142 }
143 return;
144 }
145 Ok(resp) => {
146 let status = resp.status();
147 let body = resp.text().await.unwrap_or_default();
148 if let Some(ref cb) = config.on_flush {
149 cb(SyncStatus::Failed {
150 message: format!("HTTP {status} — {body}"),
151 });
152 }
153 }
154 Err(e) => {
155 if let Some(ref cb) = config.on_flush {
156 cb(SyncStatus::Failed {
157 message: e.to_string(),
158 });
159 }
160 }
161 }
162 tokio::time::sleep(Duration::from_secs(1 << attempt)).await;
163 }
164
165 if let Some(ref cb) = config.on_flush {
166 cb(SyncStatus::Failed {
167 message: format!("dropped {} events after 3 retries", events.len()),
168 });
169 }
170}