tame_protocol/log_flow/
flow.rs

1use rill_protocol::flow::core::{Flow, TimedEvent};
2use rill_protocol::flow::location::Location;
3use rill_protocol::io::provider::StreamType;
4use serde::{Deserialize, Serialize};
5use std::collections::VecDeque;
6
7pub const LOCATION: Location = Location::new("system:log_flow");
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct LogFlowState {
11    pub depth: u32,
12    pub logs: VecDeque<String>,
13}
14
15#[allow(clippy::new_without_default)]
16impl LogFlowState {
17    pub fn new() -> Self {
18        Self {
19            depth: 20,
20            logs: VecDeque::new(),
21        }
22    }
23}
24
25impl Flow for LogFlowState {
26    type Action = LogFlowAction;
27    type Event = LogFlowEvent;
28
29    fn stream_type() -> StreamType {
30        StreamType::from("rillrate::agent::log_flow::v0")
31    }
32
33    fn apply(&mut self, event: TimedEvent<Self::Event>) {
34        match event.event {
35            LogFlowEvent::AddLogs { lines } => {
36                self.logs.extend(lines.into_iter());
37                let len = self.logs.len();
38                let depth = self.depth as usize;
39                if len > depth {
40                    let diff = len - depth;
41                    drop(self.logs.drain(0..diff));
42                }
43            }
44            LogFlowEvent::ClearLogs => {
45                self.logs.clear();
46            }
47        }
48    }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub enum LogFlowAction {}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum LogFlowEvent {
56    AddLogs { lines: Vec<String> },
57    ClearLogs,
58}