1use std::io::BufRead;
7use std::collections::HashMap;
8
9use crate::error::{err, ErrorKind, Result};
10use crate::parse::parse_schema_str;
11use crate::types::*;
12
13#[derive(Debug, Clone)]
17pub struct StreamInfo {
18 pub id: Option<String>,
19 pub mode: StreamMode,
20 pub schema: Option<Schema>,
21 pub record_count: u64,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum StreamMode {
27 Append,
28 Cdc,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum StatusLevel {
34 Info,
35 Warn,
36 Error,
37}
38
39#[derive(Debug, Clone, PartialEq)]
42pub enum StreamEvent {
43 Core(Event),
45
46 StreamStart {
48 id: Option<String>,
49 mode: StreamMode,
50 },
51 StreamEnd {
53 id: Option<String>,
54 records: Option<u64>,
55 },
56 Heartbeat(Option<String>),
58 Status {
60 level: StatusLevel,
61 message: String,
62 },
63 Pause,
65 Resume,
67 Error(String),
69}
70
71pub struct StreamReader<R: BufRead> {
78 source: R,
79 line_buf: String,
80 line_num: usize,
81 header_read: bool,
82
83 streams: HashMap<Option<String>, StreamInfo>,
85 active_stream: Option<Option<String>>,
87 current_schema: Option<Schema>,
89
90 in_block: bool,
92 in_template: bool,
93}
94
95impl<R: BufRead> StreamReader<R> {
96 pub fn new(source: R) -> Self {
97 Self {
98 source,
99 line_buf: String::new(),
100 line_num: 0,
101 header_read: false,
102 streams: HashMap::new(),
103 active_stream: None,
104 current_schema: None,
105 in_block: false,
106 in_template: false,
107 }
108 }
109
110 pub fn active_stream(&self) -> Option<&StreamInfo> {
112 self.active_stream
113 .as_ref()
114 .and_then(|id| self.streams.get(id))
115 }
116
117 pub fn schema(&self) -> Option<&Schema> {
119 if let Some(stream) = self.active_stream() {
121 if stream.schema.is_some() {
122 return stream.schema.as_ref();
123 }
124 }
125 self.current_schema.as_ref()
126 }
127
128 pub fn line_num(&self) -> usize {
130 self.line_num
131 }
132
133 pub fn next_event(&mut self) -> Result<Option<StreamEvent>> {
135 loop {
136 self.line_buf.clear();
137 let bytes_read = self
138 .source
139 .read_line(&mut self.line_buf)
140 .map_err(|e| err(ErrorKind::UnexpectedEof, self.line_num, e.to_string()))?;
141
142 if bytes_read == 0 {
143 return Ok(None);
145 }
146
147 self.line_num += 1;
148 let line = self.line_buf.trim_end_matches('\n').trim_end_matches('\r');
149
150 let line = if self.line_num == 1 {
152 line.strip_prefix('\u{FEFF}').unwrap_or(line)
153 } else {
154 line
155 };
156
157 if self.in_block {
159 if line.trim_end() == "#/block" {
160 self.in_block = false;
161 return Ok(Some(StreamEvent::Core(Event::BlockEnd)));
162 }
163 return Ok(Some(StreamEvent::Core(Event::BlockLine(line.to_string()))));
164 }
165
166 if self.in_template {
168 if line.trim_end() == "#/template" {
169 self.in_template = false;
170 return Ok(Some(StreamEvent::Core(Event::TemplateEnd)));
171 }
172 return Ok(Some(StreamEvent::Core(Event::TemplateLine(line.to_string()))));
173 }
174
175 if line.trim().is_empty() {
177 continue;
178 }
179
180 if !self.header_read {
182 self.header_read = true;
183 let header = crate::parse::parse_header_public(line, self.line_num)?;
184 return Ok(Some(StreamEvent::Core(Event::Header(header))));
185 }
186
187 if line.starts_with("#!") {
189 continue;
190 }
191
192 let trimmed = line.trim_end();
193
194 if trimmed.starts_with("#stream start") {
198 let attrs = parse_stream_attrs(&trimmed[13..]);
199 let id = attrs.get("id").cloned();
200 let mode = match attrs.get("mode").map(|s| s.as_str()) {
201 Some("cdc") => StreamMode::Cdc,
202 _ => StreamMode::Append,
203 };
204
205 if !self.streams.contains_key(&id) {
207 self.streams.insert(
208 id.clone(),
209 StreamInfo {
210 id: id.clone(),
211 mode,
212 schema: None,
213 record_count: 0,
214 },
215 );
216 }
217 self.active_stream = Some(id.clone());
218
219 return Ok(Some(StreamEvent::StreamStart { id, mode }));
220 }
221
222 if trimmed.starts_with("#stream end") {
224 let attrs = parse_stream_attrs(&trimmed[11..]);
225 let id = attrs.get("id").cloned();
226 let records = attrs
227 .get("records")
228 .and_then(|s| s.parse::<u64>().ok());
229
230 self.streams.remove(&id);
232 if self.active_stream.as_ref() == Some(&id) {
233 self.active_stream = None;
234 }
235
236 return Ok(Some(StreamEvent::StreamEnd { id, records }));
237 }
238
239 if trimmed == "#heartbeat" || trimmed.starts_with("#heartbeat ") {
241 let ts = if trimmed.len() > 11 {
242 Some(trimmed[11..].trim().to_string())
243 } else {
244 None
245 };
246 return Ok(Some(StreamEvent::Heartbeat(ts)));
247 }
248
249 if trimmed.starts_with("#status ") {
251 let rest = &trimmed[8..];
252 let (level, message) = if let Some(msg) = rest.strip_prefix("info ") {
253 (StatusLevel::Info, msg.to_string())
254 } else if let Some(msg) = rest.strip_prefix("warn ") {
255 (StatusLevel::Warn, msg.to_string())
256 } else if let Some(msg) = rest.strip_prefix("error ") {
257 (StatusLevel::Error, msg.to_string())
258 } else {
259 (StatusLevel::Info, rest.to_string())
260 };
261 return Ok(Some(StreamEvent::Status { level, message }));
262 }
263
264 if trimmed == "#pause" {
266 return Ok(Some(StreamEvent::Pause));
267 }
268
269 if trimmed == "#resume" {
271 return Ok(Some(StreamEvent::Resume));
272 }
273
274 if trimmed.starts_with("#error ") {
276 return Ok(Some(StreamEvent::Error(trimmed[7..].to_string())));
277 }
278
279 if trimmed == "---" {
283 if self.active_stream.is_none() {
284 self.current_schema = None;
285 }
286 return Ok(Some(StreamEvent::Core(Event::SectionBreak)));
287 }
288
289 if trimmed.starts_with('§') {
291 let id = &trimmed['§'.len_utf8()..];
292 return Ok(Some(StreamEvent::Core(Event::SectionId(
293 id.trim_end().to_string(),
294 ))));
295 }
296
297 if trimmed.starts_with("#block ") {
299 let rest = &trimmed[7..];
300 let tokens: Vec<&str> = rest.split_whitespace().collect();
301 let block_type = match tokens.first().copied() {
302 Some("code") => BlockType::Code,
303 Some("text") => BlockType::Text,
304 Some("diff") => BlockType::Diff,
305 Some("raw") => BlockType::Raw,
306 Some("template") => BlockType::Template,
307 _ => {
308 return Err(err(
309 ErrorKind::InvalidBlock,
310 self.line_num,
311 "unknown block type",
312 ));
313 }
314 };
315 let mut attrs = Vec::new();
316 for &token in &tokens[1..] {
317 if let Some(eq) = token.find('=') {
318 attrs.push((
319 token[..eq].to_string(),
320 token[eq + 1..].to_string(),
321 ));
322 }
323 }
324 self.in_block = true;
325 return Ok(Some(StreamEvent::Core(Event::BlockStart {
326 block_type,
327 attributes: attrs,
328 })));
329 }
330
331 if trimmed.starts_with("#template ") {
333 let name = trimmed[10..].trim().to_string();
334 self.in_template = true;
335 return Ok(Some(StreamEvent::Core(Event::TemplateStart(name))));
336 }
337
338 if trimmed.starts_with("#schema ") {
340 let schema = parse_schema_str(&trimmed[8..], self.line_num)?;
341
342 if let Some(ref stream_id) = self.active_stream {
344 if let Some(stream) = self.streams.get_mut(stream_id) {
345 stream.schema = Some(schema.clone());
346 }
347 } else {
348 self.current_schema = Some(schema.clone());
349 }
350
351 return Ok(Some(StreamEvent::Core(Event::Schema(schema))));
352 }
353
354 if trimmed == "#recall schema" {
356 return Ok(Some(StreamEvent::Core(Event::Directive(
357 Directive::Recall,
358 ))));
359 }
360
361 if trimmed.starts_with('#') {
363 if let Some(directive) =
364 crate::parse::parse_directive_public(trimmed, self.line_num)?
365 {
366 return Ok(Some(StreamEvent::Core(Event::Directive(directive))));
367 }
368 continue;
369 }
370
371 if let Some(schema) = self.schema().cloned() {
374 match crate::parse::parse_record_public(trimmed, &schema, self.line_num) {
377 Ok(record) => {
378 if let Some(ref stream_id) = self.active_stream {
380 if let Some(stream) = self.streams.get_mut(stream_id) {
381 stream.record_count += 1;
382 }
383 }
384 return Ok(Some(StreamEvent::Core(Event::Record(record))));
385 }
386 Err(e) => {
387 if self.active_stream.is_some() {
389 return Ok(Some(StreamEvent::Error(format!(
390 "Malformed record skipped at line {}: {}",
391 self.line_num, e.message
392 ))));
393 } else {
394 return Err(e);
395 }
396 }
397 }
398 } else {
399 return Err(err(
400 ErrorKind::RecordWithoutSchema,
401 self.line_num,
402 "record found before any #schema",
403 ));
404 }
405 }
406 }
407
408 pub fn collect_events(&mut self) -> Result<Vec<StreamEvent>> {
410 let mut events = Vec::new();
411 while let Some(event) = self.next_event()? {
412 events.push(event);
413 }
414 Ok(events)
415 }
416}
417
418impl StreamReader<std::io::BufReader<std::io::Cursor<String>>> {
420 pub fn from_str(input: &str) -> Self {
421 let cursor = std::io::Cursor::new(input.to_string());
422 Self::new(std::io::BufReader::new(cursor))
423 }
424}
425
426impl<W: std::io::Write> crate::emit::Emitter<W> {
430 pub fn emit_stream_start(
432 &mut self,
433 id: Option<&str>,
434 mode: StreamMode,
435 ) -> std::io::Result<()> {
436 let w = self.writer_mut();
437 write!(w, "#stream start")?;
438 if let Some(id) = id {
439 write!(w, " id={}", id)?;
440 }
441 if mode == StreamMode::Cdc {
442 write!(w, " mode=cdc")?;
443 }
444 writeln!(w)
445 }
446
447 pub fn emit_stream_end(
449 &mut self,
450 id: Option<&str>,
451 records: Option<u64>,
452 ) -> std::io::Result<()> {
453 let w = self.writer_mut();
454 write!(w, "#stream end")?;
455 if let Some(id) = id {
456 write!(w, " id={}", id)?;
457 }
458 if let Some(n) = records {
459 write!(w, " records={}", n)?;
460 }
461 writeln!(w)
462 }
463
464 pub fn emit_heartbeat(&mut self, timestamp: Option<&str>) -> std::io::Result<()> {
466 let w = self.writer_mut();
467 if let Some(ts) = timestamp {
468 writeln!(w, "#heartbeat {}", ts)
469 } else {
470 writeln!(w, "#heartbeat")
471 }
472 }
473
474 pub fn emit_status(&mut self, level: StatusLevel, message: &str) -> std::io::Result<()> {
476 let w = self.writer_mut();
477 let lvl = match level {
478 StatusLevel::Info => "info",
479 StatusLevel::Warn => "warn",
480 StatusLevel::Error => "error",
481 };
482 writeln!(w, "#status {} {}", lvl, message)
483 }
484
485 pub fn emit_pause(&mut self) -> std::io::Result<()> {
487 writeln!(self.writer_mut(), "#pause")
488 }
489
490 pub fn emit_resume(&mut self) -> std::io::Result<()> {
492 writeln!(self.writer_mut(), "#resume")
493 }
494
495 pub fn emit_error(&mut self, message: &str) -> std::io::Result<()> {
497 writeln!(self.writer_mut(), "#error {}", message)
498 }
499}
500
501fn parse_stream_attrs(rest: &str) -> HashMap<String, String> {
504 let mut attrs = HashMap::new();
505 for token in rest.split_whitespace() {
506 if let Some(eq) = token.find('=') {
507 let key = &token[..eq];
508 let val = &token[eq + 1..];
509 attrs.insert(key.to_string(), val.to_string());
510 }
511 }
512 attrs
513}
514
515#[cfg(test)]
518mod tests {
519 use super::*;
520
521 #[test]
522 fn test_stream_lifecycle() {
523 let input = "\
524#!sif v1
525#stream start id=build
526#schema timestamp:str message:str
5272026-03-08T09:00:00Z\tCompiling
5282026-03-08T09:00:01Z\tDone
529#stream end id=build records=2
530";
531 let mut reader = StreamReader::from_str(input);
532 let events = reader.collect_events().unwrap();
533
534 let starts: Vec<_> = events
535 .iter()
536 .filter(|e| matches!(e, StreamEvent::StreamStart { .. }))
537 .collect();
538 assert_eq!(starts.len(), 1);
539
540 let ends: Vec<_> = events
541 .iter()
542 .filter(|e| matches!(e, StreamEvent::StreamEnd { .. }))
543 .collect();
544 assert_eq!(ends.len(), 1);
545 if let StreamEvent::StreamEnd { records, .. } = &ends[0] {
546 assert_eq!(*records, Some(2));
547 }
548
549 let records: Vec<_> = events
550 .iter()
551 .filter(|e| matches!(e, StreamEvent::Core(Event::Record(_))))
552 .collect();
553 assert_eq!(records.len(), 2);
554 }
555
556 #[test]
557 fn test_cdc_stream() {
558 let input = "\
559#!sif v1
560#stream start id=inventory mode=cdc
561#schema id:uint:id product:str stock:uint
5621\tWidget A\t100
5632\tWidget B\t50
564Δ1\tWidget A\t97
565∅2
566#stream end id=inventory records=4
567";
568 let mut reader = StreamReader::from_str(input);
569 let events = reader.collect_events().unwrap();
570
571 let records: Vec<_> = events
572 .iter()
573 .filter_map(|e| match e {
574 StreamEvent::Core(Event::Record(r)) => Some(r),
575 _ => None,
576 })
577 .collect();
578 assert_eq!(records.len(), 4);
579 assert_eq!(records[0].cdc_op, CdcOp::Insert);
580 assert_eq!(records[2].cdc_op, CdcOp::Update);
581 assert_eq!(records[3].cdc_op, CdcOp::Delete);
582 }
583
584 #[test]
585 fn test_multiplexed_streams() {
586 let input = "\
587#!sif v1
588#stream start id=build
589#schema step:str status:str
590#stream start id=tests
591#schema name:str passed:bool
592#stream start id=build
593compile\tok
594#stream start id=tests
595api_auth\tT
596#stream end id=build
597#stream end id=tests
598";
599 let mut reader = StreamReader::from_str(input);
600 let events = reader.collect_events().unwrap();
601
602 let records: Vec<_> = events
603 .iter()
604 .filter(|e| matches!(e, StreamEvent::Core(Event::Record(_))))
605 .collect();
606 assert_eq!(records.len(), 2);
607 }
608
609 #[test]
610 fn test_flow_control() {
611 let input = "\
612#!sif v1
613#stream start id=deploy
614#schema step:str status:str
615build\tcomplete
616#pause
617#resume
618deploy\tin_progress
619#stream end id=deploy
620";
621 let mut reader = StreamReader::from_str(input);
622 let events = reader.collect_events().unwrap();
623
624 assert!(events.iter().any(|e| matches!(e, StreamEvent::Pause)));
625 assert!(events.iter().any(|e| matches!(e, StreamEvent::Resume)));
626 }
627
628 #[test]
629 fn test_heartbeat() {
630 let input = "\
631#!sif v1
632#stream start
633#schema x:uint
634#heartbeat
635#heartbeat 2026-03-08T09:00:30Z
6361
637#stream end
638";
639 let mut reader = StreamReader::from_str(input);
640 let events = reader.collect_events().unwrap();
641
642 let heartbeats: Vec<_> = events
643 .iter()
644 .filter_map(|e| match e {
645 StreamEvent::Heartbeat(ts) => Some(ts.clone()),
646 _ => None,
647 })
648 .collect();
649 assert_eq!(heartbeats.len(), 2);
650 assert_eq!(heartbeats[0], None);
651 assert_eq!(
652 heartbeats[1],
653 Some("2026-03-08T09:00:30Z".to_string())
654 );
655 }
656
657 #[test]
658 fn test_status_levels() {
659 let input = "\
660#!sif v1
661#stream start
662#schema x:uint
663#status info Stream initialized
664#status warn Degraded
665#status error Connection lost
666#stream end
667";
668 let mut reader = StreamReader::from_str(input);
669 let events = reader.collect_events().unwrap();
670
671 let statuses: Vec<_> = events
672 .iter()
673 .filter_map(|e| match e {
674 StreamEvent::Status { level, message } => Some((*level, message.clone())),
675 _ => None,
676 })
677 .collect();
678 assert_eq!(statuses.len(), 3);
679 assert_eq!(statuses[0].0, StatusLevel::Info);
680 assert_eq!(statuses[1].0, StatusLevel::Warn);
681 assert_eq!(statuses[2].0, StatusLevel::Error);
682 }
683
684 #[test]
685 fn test_schema_evolution() {
686 let input = "\
687#!sif v1
688#stream start id=metrics
689#schema timestamp:str cpu:float
6902026-03-08T09:00:00Z\t42.5
691#schema timestamp:str cpu:float gpu:float
6922026-03-08T09:00:01Z\t43.0\t85.0
693#stream end id=metrics
694";
695 let mut reader = StreamReader::from_str(input);
696 let events = reader.collect_events().unwrap();
697
698 let schemas: Vec<_> = events
699 .iter()
700 .filter(|e| matches!(e, StreamEvent::Core(Event::Schema(_))))
701 .collect();
702 assert_eq!(schemas.len(), 2);
703
704 let records: Vec<_> = events
705 .iter()
706 .filter_map(|e| match e {
707 StreamEvent::Core(Event::Record(r)) => Some(r),
708 _ => None,
709 })
710 .collect();
711 assert_eq!(records.len(), 2);
712 assert_eq!(records[0].values.len(), 2);
713 assert_eq!(records[1].values.len(), 3);
714 }
715
716 #[test]
717 fn test_malformed_record_recovery() {
718 let input = "\
719#!sif v1
720#stream start
721#schema id:uint name:str
7221\talice
723bad record with wrong fields and not a uint
7242\tbob
725#stream end
726";
727 let mut reader = StreamReader::from_str(input);
728 let events = reader.collect_events().unwrap();
729
730 let records: Vec<_> = events
732 .iter()
733 .filter(|e| matches!(e, StreamEvent::Core(Event::Record(_))))
734 .collect();
735 let errors: Vec<_> = events
736 .iter()
737 .filter(|e| matches!(e, StreamEvent::Error(_)))
738 .collect();
739 assert_eq!(records.len(), 2);
740 assert_eq!(errors.len(), 1);
741 }
742
743 #[test]
744 fn test_section_breaks_in_stream() {
745 let input = "\
746#!sif v1
747#stream start
748#schema x:uint
7491
750---
7512
752#stream end
753";
754 let mut reader = StreamReader::from_str(input);
755 let events = reader.collect_events().unwrap();
756
757 let records: Vec<_> = events
759 .iter()
760 .filter(|e| matches!(e, StreamEvent::Core(Event::Record(_))))
761 .collect();
762 assert_eq!(records.len(), 2);
763 }
764}