askit_std_agents/
stream.rs

1use agent_stream_kit::{
2    ASKit, AgentConfigs, AgentContext, AgentData, AgentError, AgentOutput, AgentValue, AsAgent,
3    async_trait,
4};
5use askit_macros::askit_agent;
6
7static CATEGORY: &str = "Std/Stream";
8
9static PIN_IN1: &str = "in1";
10static PIN_IN2: &str = "in2";
11static PIN_IN3: &str = "in3";
12static PIN_IN4: &str = "in4";
13static PIN_OUT1: &str = "out1";
14static PIN_OUT2: &str = "out2";
15static PIN_OUT3: &str = "out3";
16static PIN_OUT4: &str = "out4";
17
18/// Receives inputs in any order and, once all are present, emits them sequentially.
19struct SyncAgent {
20    n: usize,
21    in_ports: Vec<String>,
22    input_values: Vec<Option<AgentValue>>,
23    current_id: usize,
24}
25
26impl SyncAgent {
27    fn new_with_n(n: usize) -> Self {
28        let in_ports = (0..n).map(|i| format!("in{}", i + 1)).collect();
29        let input_values = vec![None; n];
30        Self {
31            n,
32            in_ports,
33            input_values,
34            current_id: 0,
35        }
36    }
37
38    async fn process_impl(
39        &mut self,
40        ctx: AgentContext,
41        pin: String,
42        value: AgentValue,
43    ) -> Result<Vec<AgentValue>, AgentError> {
44        // Reset input values if context ID changes
45        let ctx_id = ctx.id();
46        if ctx_id != self.current_id {
47            self.current_id = ctx_id;
48            for slot in &mut self.input_values {
49                *slot = None;
50            }
51        }
52
53        // Store the input value
54        for i in 0..self.n {
55            if pin == self.in_ports[i] {
56                self.input_values[i] = Some(value.clone());
57            }
58        }
59
60        // Check if all inputs are present
61        if self.input_values.iter().any(|v| v.is_none()) {
62            return Ok(Vec::new());
63        }
64
65        // All inputs are present, output in order
66        let mut outputs = Vec::new();
67        for i in 0..self.n {
68            let out_value = self.input_values[i].take().unwrap();
69            outputs.push(out_value);
70        }
71
72        Ok(outputs)
73    }
74}
75
76#[askit_agent(
77    title = "Sync2",
78    category = CATEGORY,
79    inputs = [PIN_IN1, PIN_IN2],
80    outputs = [PIN_OUT1, PIN_OUT2],
81)]
82struct Sync2Agent {
83    data: AgentData,
84    inner: SyncAgent,
85}
86
87#[async_trait]
88impl AsAgent for Sync2Agent {
89    fn new(
90        askit: ASKit,
91        id: String,
92        def_name: String,
93        config: Option<AgentConfigs>,
94    ) -> Result<Self, AgentError> {
95        let data = AgentData::new(askit, id, def_name, config);
96        let inner = SyncAgent::new_with_n(2);
97        Ok(Self { data, inner })
98    }
99
100    async fn process(
101        &mut self,
102        ctx: AgentContext,
103        pin: String,
104        value: AgentValue,
105    ) -> Result<(), AgentError> {
106        let out = self.inner.process_impl(ctx.clone(), pin, value).await?;
107        if out.len() != 2 {
108            return Ok(());
109        }
110        self.try_output(ctx.clone(), PIN_OUT1, out[0].clone())?;
111        self.try_output(ctx, PIN_OUT2, out[1].clone())?;
112
113        Ok(())
114    }
115}
116
117#[askit_agent(
118    title = "Sync3",
119    category = CATEGORY,
120    inputs = [PIN_IN1, PIN_IN2, PIN_IN3],
121    outputs = [PIN_OUT1, PIN_OUT2, PIN_OUT3],
122)]
123struct Sync3Agent {
124    data: AgentData,
125    inner: SyncAgent,
126}
127
128#[async_trait]
129impl AsAgent for Sync3Agent {
130    fn new(
131        askit: ASKit,
132        id: String,
133        def_name: String,
134        config: Option<AgentConfigs>,
135    ) -> Result<Self, AgentError> {
136        let data = AgentData::new(askit, id, def_name, config);
137        let inner = SyncAgent::new_with_n(3);
138        Ok(Self { data, inner })
139    }
140
141    async fn process(
142        &mut self,
143        ctx: AgentContext,
144        pin: String,
145        value: AgentValue,
146    ) -> Result<(), AgentError> {
147        let out = self.inner.process_impl(ctx.clone(), pin, value).await?;
148        if out.len() != 3 {
149            return Ok(());
150        }
151        self.try_output(ctx.clone(), PIN_OUT1, out[0].clone())?;
152        self.try_output(ctx.clone(), PIN_OUT2, out[1].clone())?;
153        self.try_output(ctx, PIN_OUT3, out[2].clone())?;
154
155        Ok(())
156    }
157}
158
159#[askit_agent(
160    title = "Sync4",
161    category = CATEGORY,
162    inputs = [PIN_IN1, PIN_IN2, PIN_IN3, PIN_IN4],
163    outputs = [PIN_OUT1, PIN_OUT2, PIN_OUT3, PIN_OUT4],
164)]
165struct Sync4Agent {
166    data: AgentData,
167    inner: SyncAgent,
168}
169
170#[async_trait]
171impl AsAgent for Sync4Agent {
172    fn new(
173        askit: ASKit,
174        id: String,
175        def_name: String,
176        config: Option<AgentConfigs>,
177    ) -> Result<Self, AgentError> {
178        let data = AgentData::new(askit, id, def_name, config);
179        let inner = SyncAgent::new_with_n(3);
180        Ok(Self { data, inner })
181    }
182
183    async fn process(
184        &mut self,
185        ctx: AgentContext,
186        pin: String,
187        value: AgentValue,
188    ) -> Result<(), AgentError> {
189        let out = self.inner.process_impl(ctx.clone(), pin, value).await?;
190        if out.len() != 4 {
191            return Ok(());
192        }
193        self.try_output(ctx.clone(), PIN_OUT1, out[0].clone())?;
194        self.try_output(ctx.clone(), PIN_OUT2, out[1].clone())?;
195        self.try_output(ctx.clone(), PIN_OUT3, out[2].clone())?;
196        self.try_output(ctx, PIN_OUT4, out[3].clone())?;
197
198        Ok(())
199    }
200}