Skip to main content

cbtop/event_streaming/
mod.rs

1//! Structured Event Streaming (PMAT-043)
2//!
3//! Stream metrics to time-series databases and event systems.
4//!
5//! # Features
6//!
7//! - InfluxDB Line Protocol export
8//! - JSON Lines file sink
9//! - Batch buffering and compression
10//! - Correlation ID tracking
11//!
12//! # Falsification Criteria (F1351-F1360)
13//!
14//! See `tests/event_streaming_f1351.rs` for falsification tests.
15
16use std::collections::HashMap;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19/// Schema version for event format
20pub const SCHEMA_VERSION: u32 = 1;
21
22/// Return the current wall-clock time as nanoseconds since the Unix epoch.
23///
24/// Falls back to `0` if the system clock is before the epoch (should never
25/// happen in practice).
26#[inline]
27fn now_nanos() -> u64 {
28    SystemTime::now()
29        .duration_since(UNIX_EPOCH)
30        .map(|d| d.as_nanos() as u64)
31        .unwrap_or(0)
32}
33
34/// Format a slice of events by applying `formatter` to each element and
35/// joining the results with newlines.
36fn format_events<T, F: Fn(&T) -> String>(events: &[T], formatter: F) -> String {
37    events.iter().map(formatter).collect::<Vec<_>>().join("\n")
38}
39
40/// Default batch size
41pub const DEFAULT_BATCH_SIZE: usize = 100;
42
43/// Event sink type
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum SinkType {
46    /// InfluxDB Line Protocol
47    InfluxDb,
48    /// JSON Lines file
49    JsonLines,
50    /// Kafka (simulated)
51    Kafka,
52    /// Console (for testing)
53    Console,
54}
55
56impl SinkType {
57    /// Get sink name
58    pub fn name(&self) -> &'static str {
59        match self {
60            Self::InfluxDb => "influxdb",
61            Self::JsonLines => "jsonlines",
62            Self::Kafka => "kafka",
63            Self::Console => "console",
64        }
65    }
66}
67
68/// Metric event
69#[derive(Debug, Clone)]
70pub struct MetricEvent {
71    /// Measurement name
72    pub measurement: String,
73    /// Tags (indexed fields)
74    pub tags: HashMap<String, String>,
75    /// Fields (values)
76    pub fields: HashMap<String, f64>,
77    /// Timestamp (nanoseconds)
78    pub timestamp_ns: u64,
79    /// Correlation ID for tracing
80    pub correlation_id: Option<String>,
81    /// Schema version
82    pub schema_version: u32,
83}
84
85impl MetricEvent {
86    /// Create new event
87    pub fn new(measurement: &str) -> Self {
88        Self {
89            measurement: measurement.to_string(),
90            tags: HashMap::new(),
91            fields: HashMap::new(),
92            timestamp_ns: now_nanos(),
93            correlation_id: None,
94            schema_version: SCHEMA_VERSION,
95        }
96    }
97
98    /// Add tag
99    pub fn with_tag(mut self, key: &str, value: &str) -> Self {
100        self.tags.insert(key.to_string(), value.to_string());
101        self
102    }
103
104    /// Add field
105    pub fn with_field(mut self, key: &str, value: f64) -> Self {
106        self.fields.insert(key.to_string(), value);
107        self
108    }
109
110    /// Set correlation ID
111    pub fn with_correlation_id(mut self, id: &str) -> Self {
112        self.correlation_id = Some(id.to_string());
113        self
114    }
115
116    /// Set timestamp
117    pub fn with_timestamp(mut self, timestamp_ns: u64) -> Self {
118        self.timestamp_ns = timestamp_ns;
119        self
120    }
121
122    /// Format as InfluxDB Line Protocol
123    pub fn to_influx_line(&self) -> String {
124        // measurement,tag1=val1,tag2=val2 field1=val1,field2=val2 timestamp
125        let mut line = self.measurement.clone();
126
127        // Add tags (sorted for consistency)
128        let mut tag_pairs: Vec<_> = self.tags.iter().collect();
129        tag_pairs.sort_by_key(|(k, _)| *k);
130        for (key, value) in tag_pairs {
131            line.push_str(&format!(",{}={}", escape_influx(key), escape_influx(value)));
132        }
133
134        // Add fields
135        line.push(' ');
136        let mut field_pairs: Vec<_> = self.fields.iter().collect();
137        field_pairs.sort_by_key(|(k, _)| *k);
138        let field_str: Vec<String> = field_pairs
139            .iter()
140            .map(|(k, v)| format!("{}={}", k, v))
141            .collect();
142        line.push_str(&field_str.join(","));
143
144        // Add timestamp
145        line.push_str(&format!(" {}", self.timestamp_ns));
146
147        line
148    }
149
150    /// Format as JSON
151    pub fn to_json(&self) -> String {
152        let tags_json: Vec<String> = self
153            .tags
154            .iter()
155            .map(|(k, v)| format!("\"{}\":\"{}\"", k, v))
156            .collect();
157
158        let fields_json: Vec<String> = self
159            .fields
160            .iter()
161            .map(|(k, v)| format!("\"{}\":{}", k, v))
162            .collect();
163
164        let correlation = self
165            .correlation_id
166            .as_ref()
167            .map(|id| format!(",\"correlation_id\":\"{}\"", id))
168            .unwrap_or_default();
169
170        format!(
171            r#"{{"measurement":"{}","tags":{{{}}},"fields":{{{}}},"timestamp_ns":{},"schema_version":{}{}}}"#,
172            self.measurement,
173            tags_json.join(","),
174            fields_json.join(","),
175            self.timestamp_ns,
176            self.schema_version,
177            correlation
178        )
179    }
180}
181
182/// Escape string for InfluxDB Line Protocol
183fn escape_influx(s: &str) -> String {
184    s.replace(' ', "\\ ")
185        .replace(',', "\\,")
186        .replace('=', "\\=")
187}
188
189/// Event batch
190#[derive(Debug, Clone)]
191pub struct EventBatch {
192    /// Events in batch
193    pub events: Vec<MetricEvent>,
194    /// Batch ID
195    pub batch_id: u64,
196    /// Created timestamp
197    pub created_ns: u64,
198}
199
200impl EventBatch {
201    /// Create new batch
202    pub fn new(batch_id: u64) -> Self {
203        Self {
204            events: Vec::new(),
205            batch_id,
206            created_ns: now_nanos(),
207        }
208    }
209
210    /// Add event
211    pub fn add(&mut self, event: MetricEvent) {
212        self.events.push(event);
213    }
214
215    /// Get batch size
216    pub fn len(&self) -> usize {
217        self.events.len()
218    }
219
220    /// Check if empty
221    pub fn is_empty(&self) -> bool {
222        self.events.is_empty()
223    }
224
225    /// Format as InfluxDB batch
226    pub fn to_influx_batch(&self) -> String {
227        format_events(&self.events, MetricEvent::to_influx_line)
228    }
229
230    /// Format as JSON Lines
231    pub fn to_json_lines(&self) -> String {
232        format_events(&self.events, MetricEvent::to_json)
233    }
234}
235
236/// Sink health status
237#[derive(Debug, Clone)]
238pub struct SinkHealth {
239    /// Is connected
240    pub connected: bool,
241    /// Last successful write timestamp
242    pub last_write_ns: Option<u64>,
243    /// Events written
244    pub events_written: u64,
245    /// Write errors
246    pub write_errors: u64,
247}
248
249impl Default for SinkHealth {
250    fn default() -> Self {
251        Self {
252            connected: true,
253            last_write_ns: None,
254            events_written: 0,
255            write_errors: 0,
256        }
257    }
258}
259
260/// Retry configuration
261#[derive(Debug, Clone)]
262pub struct RetryConfig {
263    /// Max retries
264    pub max_retries: u32,
265    /// Initial delay (ms)
266    pub initial_delay_ms: u64,
267    /// Max delay (ms)
268    pub max_delay_ms: u64,
269    /// Backoff multiplier
270    pub multiplier: f64,
271}
272
273impl Default for RetryConfig {
274    fn default() -> Self {
275        Self {
276            max_retries: 3,
277            initial_delay_ms: 100,
278            max_delay_ms: 10000,
279            multiplier: 2.0,
280        }
281    }
282}
283
284impl RetryConfig {
285    /// Calculate delay for attempt
286    pub fn delay_for_attempt(&self, attempt: u32) -> u64 {
287        let delay = self.initial_delay_ms as f64 * self.multiplier.powi(attempt as i32);
288        (delay as u64).min(self.max_delay_ms)
289    }
290}
291
292/// Event streamer
293#[derive(Debug)]
294pub struct EventStreamer {
295    /// Sink type
296    sink_type: SinkType,
297    /// Batch size
298    batch_size: usize,
299    /// Current batch
300    current_batch: EventBatch,
301    /// Batch counter
302    batch_counter: u64,
303    /// Retry config
304    retry_config: RetryConfig,
305    /// Health status
306    health: SinkHealth,
307    /// Enable compression
308    compression: bool,
309    /// Correlation ID generator counter
310    correlation_counter: u64,
311    /// Output buffer (for file sink)
312    output_buffer: Vec<String>,
313}
314
315impl Default for EventStreamer {
316    fn default() -> Self {
317        Self::new(SinkType::Console)
318    }
319}
320
321impl EventStreamer {
322    /// Create new streamer
323    pub fn new(sink_type: SinkType) -> Self {
324        Self {
325            sink_type,
326            batch_size: DEFAULT_BATCH_SIZE,
327            current_batch: EventBatch::new(0),
328            batch_counter: 0,
329            retry_config: RetryConfig::default(),
330            health: SinkHealth::default(),
331            compression: false,
332            correlation_counter: 0,
333            output_buffer: Vec::new(),
334        }
335    }
336
337    /// Set batch size
338    pub fn with_batch_size(mut self, size: usize) -> Self {
339        self.batch_size = size.max(1);
340        self
341    }
342
343    /// Enable compression
344    pub fn with_compression(mut self, enabled: bool) -> Self {
345        self.compression = enabled;
346        self
347    }
348
349    /// Set retry config
350    pub fn with_retry(mut self, config: RetryConfig) -> Self {
351        self.retry_config = config;
352        self
353    }
354
355    /// Generate correlation ID
356    pub fn generate_correlation_id(&mut self) -> String {
357        self.correlation_counter += 1;
358        format!("cbtop-{}-{}", std::process::id(), self.correlation_counter)
359    }
360
361    /// Send event
362    pub fn send(&mut self, event: MetricEvent) -> bool {
363        self.current_batch.add(event);
364
365        if self.current_batch.len() >= self.batch_size {
366            self.flush()
367        } else {
368            true
369        }
370    }
371
372    /// Flush current batch
373    pub fn flush(&mut self) -> bool {
374        if self.current_batch.is_empty() {
375            return true;
376        }
377
378        let result = self.write_batch(&self.current_batch.clone());
379
380        if result {
381            self.health.events_written += self.current_batch.len() as u64;
382            self.health.last_write_ns = Some(now_nanos());
383
384            // Start new batch
385            self.batch_counter += 1;
386            self.current_batch = EventBatch::new(self.batch_counter);
387            true
388        } else {
389            self.health.write_errors += 1;
390            false
391        }
392    }
393
394    /// Write batch to sink
395    fn write_batch(&mut self, batch: &EventBatch) -> bool {
396        match self.sink_type {
397            SinkType::Console => {
398                for event in &batch.events {
399                    println!("{}", event.to_json());
400                }
401                true
402            }
403            SinkType::InfluxDb => {
404                // In production, this would make HTTP request
405                // For now, store in buffer
406                self.output_buffer.push(batch.to_influx_batch());
407                true
408            }
409            SinkType::JsonLines => {
410                self.output_buffer.push(batch.to_json_lines());
411                true
412            }
413            SinkType::Kafka => {
414                // Simulated Kafka produce
415                self.output_buffer.push(batch.to_json_lines());
416                true
417            }
418        }
419    }
420
421    /// Get health status
422    pub fn health(&self) -> &SinkHealth {
423        &self.health
424    }
425
426    /// Check if healthy
427    pub fn is_healthy(&self) -> bool {
428        self.health.connected && self.health.write_errors == 0
429    }
430
431    /// Get output buffer (for testing)
432    pub fn output_buffer(&self) -> &[String] {
433        &self.output_buffer
434    }
435
436    /// Clear output buffer
437    pub fn clear_buffer(&mut self) {
438        self.output_buffer.clear();
439    }
440
441    /// Get events written count
442    pub fn events_written(&self) -> u64 {
443        self.health.events_written
444    }
445
446    /// Get pending events count
447    pub fn pending_count(&self) -> usize {
448        self.current_batch.len()
449    }
450
451    /// Graceful shutdown - flush remaining events
452    pub fn shutdown(&mut self) -> bool {
453        self.flush()
454    }
455}
456
457/// Compress data using simple run-length encoding (placeholder)
458pub fn compress_data(data: &[u8]) -> Vec<u8> {
459    // In production, use flate2 or lz4
460    // For now, just return as-is
461    data.to_vec()
462}
463
464/// Create event from performance sample
465pub fn event_from_sample(metric: &str, value: f64, tags: &[(&str, &str)]) -> MetricEvent {
466    let mut event = MetricEvent::new(metric).with_field("value", value);
467
468    for (key, val) in tags {
469        event = event.with_tag(key, val);
470    }
471
472    event
473}
474
475#[cfg(test)]
476mod tests;