1use std::collections::HashMap;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19pub const SCHEMA_VERSION: u32 = 1;
21
22#[inline]
27fn now_nanos() -> u64 {
28 SystemTime::now()
29 .duration_since(UNIX_EPOCH)
30 .map(|d| d.as_nanos() as u64)
31 .unwrap_or(0)
32}
33
34fn format_events<T, F: Fn(&T) -> String>(events: &[T], formatter: F) -> String {
37 events.iter().map(formatter).collect::<Vec<_>>().join("\n")
38}
39
40pub const DEFAULT_BATCH_SIZE: usize = 100;
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum SinkType {
46 InfluxDb,
48 JsonLines,
50 Kafka,
52 Console,
54}
55
56impl SinkType {
57 pub fn name(&self) -> &'static str {
59 match self {
60 Self::InfluxDb => "influxdb",
61 Self::JsonLines => "jsonlines",
62 Self::Kafka => "kafka",
63 Self::Console => "console",
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct MetricEvent {
71 pub measurement: String,
73 pub tags: HashMap<String, String>,
75 pub fields: HashMap<String, f64>,
77 pub timestamp_ns: u64,
79 pub correlation_id: Option<String>,
81 pub schema_version: u32,
83}
84
85impl MetricEvent {
86 pub fn new(measurement: &str) -> Self {
88 Self {
89 measurement: measurement.to_string(),
90 tags: HashMap::new(),
91 fields: HashMap::new(),
92 timestamp_ns: now_nanos(),
93 correlation_id: None,
94 schema_version: SCHEMA_VERSION,
95 }
96 }
97
98 pub fn with_tag(mut self, key: &str, value: &str) -> Self {
100 self.tags.insert(key.to_string(), value.to_string());
101 self
102 }
103
104 pub fn with_field(mut self, key: &str, value: f64) -> Self {
106 self.fields.insert(key.to_string(), value);
107 self
108 }
109
110 pub fn with_correlation_id(mut self, id: &str) -> Self {
112 self.correlation_id = Some(id.to_string());
113 self
114 }
115
116 pub fn with_timestamp(mut self, timestamp_ns: u64) -> Self {
118 self.timestamp_ns = timestamp_ns;
119 self
120 }
121
122 pub fn to_influx_line(&self) -> String {
124 let mut line = self.measurement.clone();
126
127 let mut tag_pairs: Vec<_> = self.tags.iter().collect();
129 tag_pairs.sort_by_key(|(k, _)| *k);
130 for (key, value) in tag_pairs {
131 line.push_str(&format!(",{}={}", escape_influx(key), escape_influx(value)));
132 }
133
134 line.push(' ');
136 let mut field_pairs: Vec<_> = self.fields.iter().collect();
137 field_pairs.sort_by_key(|(k, _)| *k);
138 let field_str: Vec<String> = field_pairs
139 .iter()
140 .map(|(k, v)| format!("{}={}", k, v))
141 .collect();
142 line.push_str(&field_str.join(","));
143
144 line.push_str(&format!(" {}", self.timestamp_ns));
146
147 line
148 }
149
150 pub fn to_json(&self) -> String {
152 let tags_json: Vec<String> = self
153 .tags
154 .iter()
155 .map(|(k, v)| format!("\"{}\":\"{}\"", k, v))
156 .collect();
157
158 let fields_json: Vec<String> = self
159 .fields
160 .iter()
161 .map(|(k, v)| format!("\"{}\":{}", k, v))
162 .collect();
163
164 let correlation = self
165 .correlation_id
166 .as_ref()
167 .map(|id| format!(",\"correlation_id\":\"{}\"", id))
168 .unwrap_or_default();
169
170 format!(
171 r#"{{"measurement":"{}","tags":{{{}}},"fields":{{{}}},"timestamp_ns":{},"schema_version":{}{}}}"#,
172 self.measurement,
173 tags_json.join(","),
174 fields_json.join(","),
175 self.timestamp_ns,
176 self.schema_version,
177 correlation
178 )
179 }
180}
181
182fn escape_influx(s: &str) -> String {
184 s.replace(' ', "\\ ")
185 .replace(',', "\\,")
186 .replace('=', "\\=")
187}
188
189#[derive(Debug, Clone)]
191pub struct EventBatch {
192 pub events: Vec<MetricEvent>,
194 pub batch_id: u64,
196 pub created_ns: u64,
198}
199
200impl EventBatch {
201 pub fn new(batch_id: u64) -> Self {
203 Self {
204 events: Vec::new(),
205 batch_id,
206 created_ns: now_nanos(),
207 }
208 }
209
210 pub fn add(&mut self, event: MetricEvent) {
212 self.events.push(event);
213 }
214
215 pub fn len(&self) -> usize {
217 self.events.len()
218 }
219
220 pub fn is_empty(&self) -> bool {
222 self.events.is_empty()
223 }
224
225 pub fn to_influx_batch(&self) -> String {
227 format_events(&self.events, MetricEvent::to_influx_line)
228 }
229
230 pub fn to_json_lines(&self) -> String {
232 format_events(&self.events, MetricEvent::to_json)
233 }
234}
235
236#[derive(Debug, Clone)]
238pub struct SinkHealth {
239 pub connected: bool,
241 pub last_write_ns: Option<u64>,
243 pub events_written: u64,
245 pub write_errors: u64,
247}
248
249impl Default for SinkHealth {
250 fn default() -> Self {
251 Self {
252 connected: true,
253 last_write_ns: None,
254 events_written: 0,
255 write_errors: 0,
256 }
257 }
258}
259
260#[derive(Debug, Clone)]
262pub struct RetryConfig {
263 pub max_retries: u32,
265 pub initial_delay_ms: u64,
267 pub max_delay_ms: u64,
269 pub multiplier: f64,
271}
272
273impl Default for RetryConfig {
274 fn default() -> Self {
275 Self {
276 max_retries: 3,
277 initial_delay_ms: 100,
278 max_delay_ms: 10000,
279 multiplier: 2.0,
280 }
281 }
282}
283
284impl RetryConfig {
285 pub fn delay_for_attempt(&self, attempt: u32) -> u64 {
287 let delay = self.initial_delay_ms as f64 * self.multiplier.powi(attempt as i32);
288 (delay as u64).min(self.max_delay_ms)
289 }
290}
291
292#[derive(Debug)]
294pub struct EventStreamer {
295 sink_type: SinkType,
297 batch_size: usize,
299 current_batch: EventBatch,
301 batch_counter: u64,
303 retry_config: RetryConfig,
305 health: SinkHealth,
307 compression: bool,
309 correlation_counter: u64,
311 output_buffer: Vec<String>,
313}
314
315impl Default for EventStreamer {
316 fn default() -> Self {
317 Self::new(SinkType::Console)
318 }
319}
320
321impl EventStreamer {
322 pub fn new(sink_type: SinkType) -> Self {
324 Self {
325 sink_type,
326 batch_size: DEFAULT_BATCH_SIZE,
327 current_batch: EventBatch::new(0),
328 batch_counter: 0,
329 retry_config: RetryConfig::default(),
330 health: SinkHealth::default(),
331 compression: false,
332 correlation_counter: 0,
333 output_buffer: Vec::new(),
334 }
335 }
336
337 pub fn with_batch_size(mut self, size: usize) -> Self {
339 self.batch_size = size.max(1);
340 self
341 }
342
343 pub fn with_compression(mut self, enabled: bool) -> Self {
345 self.compression = enabled;
346 self
347 }
348
349 pub fn with_retry(mut self, config: RetryConfig) -> Self {
351 self.retry_config = config;
352 self
353 }
354
355 pub fn generate_correlation_id(&mut self) -> String {
357 self.correlation_counter += 1;
358 format!("cbtop-{}-{}", std::process::id(), self.correlation_counter)
359 }
360
361 pub fn send(&mut self, event: MetricEvent) -> bool {
363 self.current_batch.add(event);
364
365 if self.current_batch.len() >= self.batch_size {
366 self.flush()
367 } else {
368 true
369 }
370 }
371
372 pub fn flush(&mut self) -> bool {
374 if self.current_batch.is_empty() {
375 return true;
376 }
377
378 let result = self.write_batch(&self.current_batch.clone());
379
380 if result {
381 self.health.events_written += self.current_batch.len() as u64;
382 self.health.last_write_ns = Some(now_nanos());
383
384 self.batch_counter += 1;
386 self.current_batch = EventBatch::new(self.batch_counter);
387 true
388 } else {
389 self.health.write_errors += 1;
390 false
391 }
392 }
393
394 fn write_batch(&mut self, batch: &EventBatch) -> bool {
396 match self.sink_type {
397 SinkType::Console => {
398 for event in &batch.events {
399 println!("{}", event.to_json());
400 }
401 true
402 }
403 SinkType::InfluxDb => {
404 self.output_buffer.push(batch.to_influx_batch());
407 true
408 }
409 SinkType::JsonLines => {
410 self.output_buffer.push(batch.to_json_lines());
411 true
412 }
413 SinkType::Kafka => {
414 self.output_buffer.push(batch.to_json_lines());
416 true
417 }
418 }
419 }
420
421 pub fn health(&self) -> &SinkHealth {
423 &self.health
424 }
425
426 pub fn is_healthy(&self) -> bool {
428 self.health.connected && self.health.write_errors == 0
429 }
430
431 pub fn output_buffer(&self) -> &[String] {
433 &self.output_buffer
434 }
435
436 pub fn clear_buffer(&mut self) {
438 self.output_buffer.clear();
439 }
440
441 pub fn events_written(&self) -> u64 {
443 self.health.events_written
444 }
445
446 pub fn pending_count(&self) -> usize {
448 self.current_batch.len()
449 }
450
451 pub fn shutdown(&mut self) -> bool {
453 self.flush()
454 }
455}
456
457pub fn compress_data(data: &[u8]) -> Vec<u8> {
459 data.to_vec()
462}
463
464pub fn event_from_sample(metric: &str, value: f64, tags: &[(&str, &str)]) -> MetricEvent {
466 let mut event = MetricEvent::new(metric).with_field("value", value);
467
468 for (key, val) in tags {
469 event = event.with_tag(key, val);
470 }
471
472 event
473}
474
475#[cfg(test)]
476mod tests;