tame_protocol/log_flow/
flow.rs1use rill_protocol::flow::core::{Flow, TimedEvent};
2use rill_protocol::flow::location::Location;
3use rill_protocol::io::provider::StreamType;
4use serde::{Deserialize, Serialize};
5use std::collections::VecDeque;
6
7pub const LOCATION: Location = Location::new("system:log_flow");
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct LogFlowState {
11 pub depth: u32,
12 pub logs: VecDeque<String>,
13}
14
15#[allow(clippy::new_without_default)]
16impl LogFlowState {
17 pub fn new() -> Self {
18 Self {
19 depth: 20,
20 logs: VecDeque::new(),
21 }
22 }
23}
24
25impl Flow for LogFlowState {
26 type Action = LogFlowAction;
27 type Event = LogFlowEvent;
28
29 fn stream_type() -> StreamType {
30 StreamType::from("rillrate::agent::log_flow::v0")
31 }
32
33 fn apply(&mut self, event: TimedEvent<Self::Event>) {
34 match event.event {
35 LogFlowEvent::AddLogs { lines } => {
36 self.logs.extend(lines.into_iter());
37 let len = self.logs.len();
38 let depth = self.depth as usize;
39 if len > depth {
40 let diff = len - depth;
41 drop(self.logs.drain(0..diff));
42 }
43 }
44 LogFlowEvent::ClearLogs => {
45 self.logs.clear();
46 }
47 }
48 }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub enum LogFlowAction {}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum LogFlowEvent {
56 AddLogs { lines: Vec<String> },
57 ClearLogs,
58}