Skip to main content

sif_parser/
streaming.rs

1// SIF Streaming Protocol v1 — Stream lifecycle, CDC, multiplexing,
2// flow control, heartbeat, status, schema evolution.
3//
4// Built from SIF-STREAMING.md specification.
5
6use std::io::BufRead;
7use std::collections::HashMap;
8
9use crate::error::{err, ErrorKind, Result};
10use crate::parse::parse_schema_str;
11use crate::types::*;
12
13// ── Streaming Types ─────────────────────────────────────────────────
14
15/// Metadata for an active stream.
16#[derive(Debug, Clone)]
17pub struct StreamInfo {
18    pub id: Option<String>,
19    pub mode: StreamMode,
20    pub schema: Option<Schema>,
21    pub record_count: u64,
22}
23
24/// Stream mode: append-only (default) or CDC.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum StreamMode {
27    Append,
28    Cdc,
29}
30
31/// Status level for `#status` directives.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum StatusLevel {
34    Info,
35    Warn,
36    Error,
37}
38
39/// Events emitted by the streaming reader, extending the core Event
40/// set with stream-specific events.
41#[derive(Debug, Clone, PartialEq)]
42pub enum StreamEvent {
43    /// Core SIF event (header, schema, record, directive, block, etc.)
44    Core(Event),
45
46    /// `#stream start` with optional id and mode.
47    StreamStart {
48        id: Option<String>,
49        mode: StreamMode,
50    },
51    /// `#stream end` with optional id and record count.
52    StreamEnd {
53        id: Option<String>,
54        records: Option<u64>,
55    },
56    /// `#heartbeat` with optional timestamp.
57    Heartbeat(Option<String>),
58    /// `#status info|warn|error <text>`.
59    Status {
60        level: StatusLevel,
61        message: String,
62    },
63    /// `#pause`
64    Pause,
65    /// `#resume`
66    Resume,
67    /// `#error <text>` (document-level error, non-fatal).
68    Error(String),
69}
70
71// ── Stream Reader ───────────────────────────────────────────────────
72
73/// A streaming SIF reader with full streaming protocol support.
74///
75/// Handles stream lifecycle, multiplexed streams, CDC mode,
76/// schema evolution, flow control, and heartbeat.
77pub struct StreamReader<R: BufRead> {
78    source: R,
79    line_buf: String,
80    line_num: usize,
81    header_read: bool,
82
83    /// Active streams keyed by id (None key = unnamed stream).
84    streams: HashMap<Option<String>, StreamInfo>,
85    /// Currently active stream id.
86    active_stream: Option<Option<String>>,
87    /// Current schema (from active stream or standalone section).
88    current_schema: Option<Schema>,
89
90    /// Block/template state.
91    in_block: bool,
92    in_template: bool,
93}
94
95impl<R: BufRead> StreamReader<R> {
96    pub fn new(source: R) -> Self {
97        Self {
98            source,
99            line_buf: String::new(),
100            line_num: 0,
101            header_read: false,
102            streams: HashMap::new(),
103            active_stream: None,
104            current_schema: None,
105            in_block: false,
106            in_template: false,
107        }
108    }
109
110    /// Returns the currently active stream info, if any.
111    pub fn active_stream(&self) -> Option<&StreamInfo> {
112        self.active_stream
113            .as_ref()
114            .and_then(|id| self.streams.get(id))
115    }
116
117    /// Returns the current schema (from active stream or section).
118    pub fn schema(&self) -> Option<&Schema> {
119        // Prefer active stream's schema, fall back to section schema.
120        if let Some(stream) = self.active_stream() {
121            if stream.schema.is_some() {
122                return stream.schema.as_ref();
123            }
124        }
125        self.current_schema.as_ref()
126    }
127
128    /// Current line number.
129    pub fn line_num(&self) -> usize {
130        self.line_num
131    }
132
133    /// Read the next event.
134    pub fn next_event(&mut self) -> Result<Option<StreamEvent>> {
135        loop {
136            self.line_buf.clear();
137            let bytes_read = self
138                .source
139                .read_line(&mut self.line_buf)
140                .map_err(|e| err(ErrorKind::UnexpectedEof, self.line_num, e.to_string()))?;
141
142            if bytes_read == 0 {
143                // EOF — treat as implicit #stream end for all open streams.
144                return Ok(None);
145            }
146
147            self.line_num += 1;
148            let line = self.line_buf.trim_end_matches('\n').trim_end_matches('\r');
149
150            // BOM handling on first line
151            let line = if self.line_num == 1 {
152                line.strip_prefix('\u{FEFF}').unwrap_or(line)
153            } else {
154                line
155            };
156
157            // Inside block
158            if self.in_block {
159                if line.trim_end() == "#/block" {
160                    self.in_block = false;
161                    return Ok(Some(StreamEvent::Core(Event::BlockEnd)));
162                }
163                return Ok(Some(StreamEvent::Core(Event::BlockLine(line.to_string()))));
164            }
165
166            // Inside template
167            if self.in_template {
168                if line.trim_end() == "#/template" {
169                    self.in_template = false;
170                    return Ok(Some(StreamEvent::Core(Event::TemplateEnd)));
171                }
172                return Ok(Some(StreamEvent::Core(Event::TemplateLine(line.to_string()))));
173            }
174
175            // Skip empty lines
176            if line.trim().is_empty() {
177                continue;
178            }
179
180            // Header
181            if !self.header_read {
182                self.header_read = true;
183                let header = crate::parse::parse_header_public(line, self.line_num)?;
184                return Ok(Some(StreamEvent::Core(Event::Header(header))));
185            }
186
187            // Skip #! after header
188            if line.starts_with("#!") {
189                continue;
190            }
191
192            let trimmed = line.trim_end();
193
194            // ── Stream-specific directives ──────────────────────
195
196            // #stream start
197            if trimmed.starts_with("#stream start") {
198                let attrs = parse_stream_attrs(&trimmed[13..]);
199                let id = attrs.get("id").cloned();
200                let mode = match attrs.get("mode").map(|s| s.as_str()) {
201                    Some("cdc") => StreamMode::Cdc,
202                    _ => StreamMode::Append,
203                };
204
205                // If this stream already exists, this is a switch (reactivation).
206                if !self.streams.contains_key(&id) {
207                    self.streams.insert(
208                        id.clone(),
209                        StreamInfo {
210                            id: id.clone(),
211                            mode,
212                            schema: None,
213                            record_count: 0,
214                        },
215                    );
216                }
217                self.active_stream = Some(id.clone());
218
219                return Ok(Some(StreamEvent::StreamStart { id, mode }));
220            }
221
222            // #stream end
223            if trimmed.starts_with("#stream end") {
224                let attrs = parse_stream_attrs(&trimmed[11..]);
225                let id = attrs.get("id").cloned();
226                let records = attrs
227                    .get("records")
228                    .and_then(|s| s.parse::<u64>().ok());
229
230                // Remove the stream.
231                self.streams.remove(&id);
232                if self.active_stream.as_ref() == Some(&id) {
233                    self.active_stream = None;
234                }
235
236                return Ok(Some(StreamEvent::StreamEnd { id, records }));
237            }
238
239            // #heartbeat
240            if trimmed == "#heartbeat" || trimmed.starts_with("#heartbeat ") {
241                let ts = if trimmed.len() > 11 {
242                    Some(trimmed[11..].trim().to_string())
243                } else {
244                    None
245                };
246                return Ok(Some(StreamEvent::Heartbeat(ts)));
247            }
248
249            // #status
250            if trimmed.starts_with("#status ") {
251                let rest = &trimmed[8..];
252                let (level, message) = if let Some(msg) = rest.strip_prefix("info ") {
253                    (StatusLevel::Info, msg.to_string())
254                } else if let Some(msg) = rest.strip_prefix("warn ") {
255                    (StatusLevel::Warn, msg.to_string())
256                } else if let Some(msg) = rest.strip_prefix("error ") {
257                    (StatusLevel::Error, msg.to_string())
258                } else {
259                    (StatusLevel::Info, rest.to_string())
260                };
261                return Ok(Some(StreamEvent::Status { level, message }));
262            }
263
264            // #pause
265            if trimmed == "#pause" {
266                return Ok(Some(StreamEvent::Pause));
267            }
268
269            // #resume
270            if trimmed == "#resume" {
271                return Ok(Some(StreamEvent::Resume));
272            }
273
274            // #error
275            if trimmed.starts_with("#error ") {
276                return Ok(Some(StreamEvent::Error(trimmed[7..].to_string())));
277            }
278
279            // ── Core SIF directives ─────────────────────────────
280
281            // Section break — schema persists in streams, resets outside
282            if trimmed == "---" {
283                if self.active_stream.is_none() {
284                    self.current_schema = None;
285                }
286                return Ok(Some(StreamEvent::Core(Event::SectionBreak)));
287            }
288
289            // Section identifier
290            if trimmed.starts_with('§') {
291                let id = &trimmed['§'.len_utf8()..];
292                return Ok(Some(StreamEvent::Core(Event::SectionId(
293                    id.trim_end().to_string(),
294                ))));
295            }
296
297            // Block start
298            if trimmed.starts_with("#block ") {
299                let rest = &trimmed[7..];
300                let tokens: Vec<&str> = rest.split_whitespace().collect();
301                let block_type = match tokens.first().copied() {
302                    Some("code") => BlockType::Code,
303                    Some("text") => BlockType::Text,
304                    Some("diff") => BlockType::Diff,
305                    Some("raw") => BlockType::Raw,
306                    Some("template") => BlockType::Template,
307                    _ => {
308                        return Err(err(
309                            ErrorKind::InvalidBlock,
310                            self.line_num,
311                            "unknown block type",
312                        ));
313                    }
314                };
315                let mut attrs = Vec::new();
316                for &token in &tokens[1..] {
317                    if let Some(eq) = token.find('=') {
318                        attrs.push((
319                            token[..eq].to_string(),
320                            token[eq + 1..].to_string(),
321                        ));
322                    }
323                }
324                self.in_block = true;
325                return Ok(Some(StreamEvent::Core(Event::BlockStart {
326                    block_type,
327                    attributes: attrs,
328                })));
329            }
330
331            // Template start
332            if trimmed.starts_with("#template ") {
333                let name = trimmed[10..].trim().to_string();
334                self.in_template = true;
335                return Ok(Some(StreamEvent::Core(Event::TemplateStart(name))));
336            }
337
338            // Schema
339            if trimmed.starts_with("#schema ") {
340                let schema = parse_schema_str(&trimmed[8..], self.line_num)?;
341
342                // Store in active stream if any, otherwise in section.
343                if let Some(ref stream_id) = self.active_stream {
344                    if let Some(stream) = self.streams.get_mut(stream_id) {
345                        stream.schema = Some(schema.clone());
346                    }
347                } else {
348                    self.current_schema = Some(schema.clone());
349                }
350
351                return Ok(Some(StreamEvent::Core(Event::Schema(schema))));
352            }
353
354            // #recall schema — no-op
355            if trimmed == "#recall schema" {
356                return Ok(Some(StreamEvent::Core(Event::Directive(
357                    Directive::Recall,
358                ))));
359            }
360
361            // Other directives
362            if trimmed.starts_with('#') {
363                if let Some(directive) =
364                    crate::parse::parse_directive_public(trimmed, self.line_num)?
365                {
366                    return Ok(Some(StreamEvent::Core(Event::Directive(directive))));
367                }
368                continue;
369            }
370
371            // ── Record ──────────────────────────────────────────
372
373            if let Some(schema) = self.schema().cloned() {
374                // In stream mode, malformed records should be skipped
375                // with a diagnostic, not cause a fatal error.
376                match crate::parse::parse_record_public(trimmed, &schema, self.line_num) {
377                    Ok(record) => {
378                        // Increment record count for active stream.
379                        if let Some(ref stream_id) = self.active_stream {
380                            if let Some(stream) = self.streams.get_mut(stream_id) {
381                                stream.record_count += 1;
382                            }
383                        }
384                        return Ok(Some(StreamEvent::Core(Event::Record(record))));
385                    }
386                    Err(e) => {
387                        // Stream mode: skip malformed record, emit error.
388                        if self.active_stream.is_some() {
389                            return Ok(Some(StreamEvent::Error(format!(
390                                "Malformed record skipped at line {}: {}",
391                                self.line_num, e.message
392                            ))));
393                        } else {
394                            return Err(e);
395                        }
396                    }
397                }
398            } else {
399                return Err(err(
400                    ErrorKind::RecordWithoutSchema,
401                    self.line_num,
402                    "record found before any #schema",
403                ));
404            }
405        }
406    }
407
408    /// Collect all events.
409    pub fn collect_events(&mut self) -> Result<Vec<StreamEvent>> {
410        let mut events = Vec::new();
411        while let Some(event) = self.next_event()? {
412            events.push(event);
413        }
414        Ok(events)
415    }
416}
417
418/// Create a StreamReader from a string.
419impl StreamReader<std::io::BufReader<std::io::Cursor<String>>> {
420    pub fn from_str(input: &str) -> Self {
421        let cursor = std::io::Cursor::new(input.to_string());
422        Self::new(std::io::BufReader::new(cursor))
423    }
424}
425
426// ── Stream Emitter ──────────────────────────────────────────────────
427
428/// Extension methods for emitting streaming directives.
429impl<W: std::io::Write> crate::emit::Emitter<W> {
430    /// Write `#stream start` with optional attributes.
431    pub fn emit_stream_start(
432        &mut self,
433        id: Option<&str>,
434        mode: StreamMode,
435    ) -> std::io::Result<()> {
436        let w = self.writer_mut();
437        write!(w, "#stream start")?;
438        if let Some(id) = id {
439            write!(w, " id={}", id)?;
440        }
441        if mode == StreamMode::Cdc {
442            write!(w, " mode=cdc")?;
443        }
444        writeln!(w)
445    }
446
447    /// Write `#stream end` with optional attributes.
448    pub fn emit_stream_end(
449        &mut self,
450        id: Option<&str>,
451        records: Option<u64>,
452    ) -> std::io::Result<()> {
453        let w = self.writer_mut();
454        write!(w, "#stream end")?;
455        if let Some(id) = id {
456            write!(w, " id={}", id)?;
457        }
458        if let Some(n) = records {
459            write!(w, " records={}", n)?;
460        }
461        writeln!(w)
462    }
463
464    /// Write `#heartbeat` with optional timestamp.
465    pub fn emit_heartbeat(&mut self, timestamp: Option<&str>) -> std::io::Result<()> {
466        let w = self.writer_mut();
467        if let Some(ts) = timestamp {
468            writeln!(w, "#heartbeat {}", ts)
469        } else {
470            writeln!(w, "#heartbeat")
471        }
472    }
473
474    /// Write `#status <level> <message>`.
475    pub fn emit_status(&mut self, level: StatusLevel, message: &str) -> std::io::Result<()> {
476        let w = self.writer_mut();
477        let lvl = match level {
478            StatusLevel::Info => "info",
479            StatusLevel::Warn => "warn",
480            StatusLevel::Error => "error",
481        };
482        writeln!(w, "#status {} {}", lvl, message)
483    }
484
485    /// Write `#pause`.
486    pub fn emit_pause(&mut self) -> std::io::Result<()> {
487        writeln!(self.writer_mut(), "#pause")
488    }
489
490    /// Write `#resume`.
491    pub fn emit_resume(&mut self) -> std::io::Result<()> {
492        writeln!(self.writer_mut(), "#resume")
493    }
494
495    /// Write `#error <message>`.
496    pub fn emit_error(&mut self, message: &str) -> std::io::Result<()> {
497        writeln!(self.writer_mut(), "#error {}", message)
498    }
499}
500
501// ── Helpers ─────────────────────────────────────────────────────────
502
503fn parse_stream_attrs(rest: &str) -> HashMap<String, String> {
504    let mut attrs = HashMap::new();
505    for token in rest.split_whitespace() {
506        if let Some(eq) = token.find('=') {
507            let key = &token[..eq];
508            let val = &token[eq + 1..];
509            attrs.insert(key.to_string(), val.to_string());
510        }
511    }
512    attrs
513}
514
515// ── Tests ───────────────────────────────────────────────────────────
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520
521    #[test]
522    fn test_stream_lifecycle() {
523        let input = "\
524#!sif v1
525#stream start id=build
526#schema timestamp:str message:str
5272026-03-08T09:00:00Z\tCompiling
5282026-03-08T09:00:01Z\tDone
529#stream end id=build records=2
530";
531        let mut reader = StreamReader::from_str(input);
532        let events = reader.collect_events().unwrap();
533
534        let starts: Vec<_> = events
535            .iter()
536            .filter(|e| matches!(e, StreamEvent::StreamStart { .. }))
537            .collect();
538        assert_eq!(starts.len(), 1);
539
540        let ends: Vec<_> = events
541            .iter()
542            .filter(|e| matches!(e, StreamEvent::StreamEnd { .. }))
543            .collect();
544        assert_eq!(ends.len(), 1);
545        if let StreamEvent::StreamEnd { records, .. } = &ends[0] {
546            assert_eq!(*records, Some(2));
547        }
548
549        let records: Vec<_> = events
550            .iter()
551            .filter(|e| matches!(e, StreamEvent::Core(Event::Record(_))))
552            .collect();
553        assert_eq!(records.len(), 2);
554    }
555
556    #[test]
557    fn test_cdc_stream() {
558        let input = "\
559#!sif v1
560#stream start id=inventory mode=cdc
561#schema id:uint:id product:str stock:uint
5621\tWidget A\t100
5632\tWidget B\t50
564Δ1\tWidget A\t97
565∅2
566#stream end id=inventory records=4
567";
568        let mut reader = StreamReader::from_str(input);
569        let events = reader.collect_events().unwrap();
570
571        let records: Vec<_> = events
572            .iter()
573            .filter_map(|e| match e {
574                StreamEvent::Core(Event::Record(r)) => Some(r),
575                _ => None,
576            })
577            .collect();
578        assert_eq!(records.len(), 4);
579        assert_eq!(records[0].cdc_op, CdcOp::Insert);
580        assert_eq!(records[2].cdc_op, CdcOp::Update);
581        assert_eq!(records[3].cdc_op, CdcOp::Delete);
582    }
583
584    #[test]
585    fn test_multiplexed_streams() {
586        let input = "\
587#!sif v1
588#stream start id=build
589#schema step:str status:str
590#stream start id=tests
591#schema name:str passed:bool
592#stream start id=build
593compile\tok
594#stream start id=tests
595api_auth\tT
596#stream end id=build
597#stream end id=tests
598";
599        let mut reader = StreamReader::from_str(input);
600        let events = reader.collect_events().unwrap();
601
602        let records: Vec<_> = events
603            .iter()
604            .filter(|e| matches!(e, StreamEvent::Core(Event::Record(_))))
605            .collect();
606        assert_eq!(records.len(), 2);
607    }
608
609    #[test]
610    fn test_flow_control() {
611        let input = "\
612#!sif v1
613#stream start id=deploy
614#schema step:str status:str
615build\tcomplete
616#pause
617#resume
618deploy\tin_progress
619#stream end id=deploy
620";
621        let mut reader = StreamReader::from_str(input);
622        let events = reader.collect_events().unwrap();
623
624        assert!(events.iter().any(|e| matches!(e, StreamEvent::Pause)));
625        assert!(events.iter().any(|e| matches!(e, StreamEvent::Resume)));
626    }
627
628    #[test]
629    fn test_heartbeat() {
630        let input = "\
631#!sif v1
632#stream start
633#schema x:uint
634#heartbeat
635#heartbeat 2026-03-08T09:00:30Z
6361
637#stream end
638";
639        let mut reader = StreamReader::from_str(input);
640        let events = reader.collect_events().unwrap();
641
642        let heartbeats: Vec<_> = events
643            .iter()
644            .filter_map(|e| match e {
645                StreamEvent::Heartbeat(ts) => Some(ts.clone()),
646                _ => None,
647            })
648            .collect();
649        assert_eq!(heartbeats.len(), 2);
650        assert_eq!(heartbeats[0], None);
651        assert_eq!(
652            heartbeats[1],
653            Some("2026-03-08T09:00:30Z".to_string())
654        );
655    }
656
657    #[test]
658    fn test_status_levels() {
659        let input = "\
660#!sif v1
661#stream start
662#schema x:uint
663#status info Stream initialized
664#status warn Degraded
665#status error Connection lost
666#stream end
667";
668        let mut reader = StreamReader::from_str(input);
669        let events = reader.collect_events().unwrap();
670
671        let statuses: Vec<_> = events
672            .iter()
673            .filter_map(|e| match e {
674                StreamEvent::Status { level, message } => Some((*level, message.clone())),
675                _ => None,
676            })
677            .collect();
678        assert_eq!(statuses.len(), 3);
679        assert_eq!(statuses[0].0, StatusLevel::Info);
680        assert_eq!(statuses[1].0, StatusLevel::Warn);
681        assert_eq!(statuses[2].0, StatusLevel::Error);
682    }
683
684    #[test]
685    fn test_schema_evolution() {
686        let input = "\
687#!sif v1
688#stream start id=metrics
689#schema timestamp:str cpu:float
6902026-03-08T09:00:00Z\t42.5
691#schema timestamp:str cpu:float gpu:float
6922026-03-08T09:00:01Z\t43.0\t85.0
693#stream end id=metrics
694";
695        let mut reader = StreamReader::from_str(input);
696        let events = reader.collect_events().unwrap();
697
698        let schemas: Vec<_> = events
699            .iter()
700            .filter(|e| matches!(e, StreamEvent::Core(Event::Schema(_))))
701            .collect();
702        assert_eq!(schemas.len(), 2);
703
704        let records: Vec<_> = events
705            .iter()
706            .filter_map(|e| match e {
707                StreamEvent::Core(Event::Record(r)) => Some(r),
708                _ => None,
709            })
710            .collect();
711        assert_eq!(records.len(), 2);
712        assert_eq!(records[0].values.len(), 2);
713        assert_eq!(records[1].values.len(), 3);
714    }
715
716    #[test]
717    fn test_malformed_record_recovery() {
718        let input = "\
719#!sif v1
720#stream start
721#schema id:uint name:str
7221\talice
723bad record with wrong fields and not a uint
7242\tbob
725#stream end
726";
727        let mut reader = StreamReader::from_str(input);
728        let events = reader.collect_events().unwrap();
729
730        // Should have 2 valid records and 1 error for the malformed one.
731        let records: Vec<_> = events
732            .iter()
733            .filter(|e| matches!(e, StreamEvent::Core(Event::Record(_))))
734            .collect();
735        let errors: Vec<_> = events
736            .iter()
737            .filter(|e| matches!(e, StreamEvent::Error(_)))
738            .collect();
739        assert_eq!(records.len(), 2);
740        assert_eq!(errors.len(), 1);
741    }
742
743    #[test]
744    fn test_section_breaks_in_stream() {
745        let input = "\
746#!sif v1
747#stream start
748#schema x:uint
7491
750---
7512
752#stream end
753";
754        let mut reader = StreamReader::from_str(input);
755        let events = reader.collect_events().unwrap();
756
757        // Schema persists across section breaks inside a stream.
758        let records: Vec<_> = events
759            .iter()
760            .filter(|e| matches!(e, StreamEvent::Core(Event::Record(_))))
761            .collect();
762        assert_eq!(records.len(), 2);
763    }
764}