Skip to main content

mcpr_integrations/emitter/
cloud_sink.rs

1//! Cloud event sink — batches proxy events and POSTs them to cloud.mcpr.app.
2//!
3//! Implements `EventSink` from mcpr-core. Internally buffers events and
4//! flushes on batch size or interval, with retry + exponential backoff.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use mcpr_core::event::{EventSink, ProxyEvent};
10use tokio::sync::mpsc;
11
12/// Callback invoked after each cloud sync attempt.
13pub type SyncCallback = Arc<dyn Fn(SyncStatus) + Send + Sync>;
14
15/// Result of a cloud sync flush.
16pub enum SyncStatus {
17    Ok { count: usize },
18    Failed { message: String },
19}
20
21/// Configuration for the cloud sink.
22pub struct CloudSinkConfig {
23    /// Full ingest URL, e.g. "https://api.mcpr.app/api/ingest-events"
24    pub endpoint: String,
25    /// Project token, e.g. "mcpr_xxxxxxxx"
26    pub token: String,
27    /// Server slug — identifies which server in the cloud project
28    pub server: Option<String>,
29    /// Flush when buffer reaches this size (default: 100)
30    pub batch_size: usize,
31    /// Flush on this interval even if buffer isn't full (default: 5s)
32    pub flush_interval: Duration,
33    /// Optional callback for reporting sync status
34    pub on_flush: Option<SyncCallback>,
35}
36
37/// Cloud event sink — batches and POSTs proxy events to the cloud API.
38///
39/// Events are queued via an internal mpsc channel. A background tokio task
40/// drains the channel and flushes batches with retry.
41pub struct CloudSink {
42    tx: mpsc::Sender<ProxyEvent>,
43}
44
45impl CloudSink {
46    pub fn new(config: CloudSinkConfig) -> Self {
47        let (tx, rx) = mpsc::channel::<ProxyEvent>(1000);
48        tokio::spawn(flush_loop(rx, config));
49        Self { tx }
50    }
51}
52
53impl EventSink for CloudSink {
54    fn on_event(&self, event: &ProxyEvent) {
55        // Non-blocking: clone and send. Drop if channel is full.
56        let _ = self.tx.try_send(event.clone());
57    }
58
59    fn name(&self) -> &'static str {
60        "cloud"
61    }
62}
63
64async fn flush_loop(mut rx: mpsc::Receiver<ProxyEvent>, config: CloudSinkConfig) {
65    let client = reqwest::Client::builder()
66        .timeout(Duration::from_secs(10))
67        .build()
68        .unwrap();
69
70    let mut buffer: Vec<ProxyEvent> = Vec::with_capacity(config.batch_size);
71    let mut interval = tokio::time::interval(config.flush_interval);
72
73    loop {
74        tokio::select! {
75            msg = rx.recv() => {
76                let Some(event) = msg else {
77                    // Channel closed — flush remaining.
78                    if !buffer.is_empty() {
79                        flush_batch(&client, &config, &mut buffer).await;
80                    }
81                    break;
82                };
83
84                buffer.push(event);
85
86                if buffer.len() >= config.batch_size {
87                    flush_batch(&client, &config, &mut buffer).await;
88                }
89            }
90            _ = interval.tick() => {
91                if !buffer.is_empty() {
92                    flush_batch(&client, &config, &mut buffer).await;
93                }
94            }
95        }
96    }
97}
98
99async fn flush_batch(
100    client: &reqwest::Client,
101    config: &CloudSinkConfig,
102    buffer: &mut Vec<ProxyEvent>,
103) {
104    let events = std::mem::take(buffer);
105
106    // Convert to JSON — the cloud API accepts the ProxyEvent format directly.
107    // Stamp server slug on each event.
108    let payload: Vec<serde_json::Value> = events
109        .iter()
110        .map(|e| {
111            let mut val = serde_json::to_value(e).unwrap_or(serde_json::Value::Null);
112            if let Some(ref server) = config.server
113                && let Some(obj) = val.as_object_mut()
114            {
115                obj.entry("server")
116                    .or_insert(serde_json::Value::String(server.clone()));
117            }
118            val
119        })
120        .collect();
121
122    let body = match serde_json::to_vec(&payload) {
123        Ok(b) => b,
124        Err(_) => return,
125    };
126
127    // Retry with exponential backoff: 1s, 2s, 4s
128    for attempt in 0..3u32 {
129        match client
130            .post(&config.endpoint)
131            .header("Authorization", format!("Bearer {}", config.token))
132            .header("Content-Type", "application/json")
133            .body(body.clone())
134            .send()
135            .await
136        {
137            Ok(resp) if matches!(resp.status().as_u16(), 200 | 202) => {
138                if let Some(ref cb) = config.on_flush {
139                    cb(SyncStatus::Ok {
140                        count: events.len(),
141                    });
142                }
143                return;
144            }
145            Ok(resp) => {
146                let status = resp.status();
147                let body = resp.text().await.unwrap_or_default();
148                if let Some(ref cb) = config.on_flush {
149                    cb(SyncStatus::Failed {
150                        message: format!("HTTP {status} — {body}"),
151                    });
152                }
153            }
154            Err(e) => {
155                if let Some(ref cb) = config.on_flush {
156                    cb(SyncStatus::Failed {
157                        message: e.to_string(),
158                    });
159                }
160            }
161        }
162        tokio::time::sleep(Duration::from_secs(1 << attempt)).await;
163    }
164
165    if let Some(ref cb) = config.on_flush {
166        cb(SyncStatus::Failed {
167            message: format!("dropped {} events after 3 retries", events.len()),
168        });
169    }
170}