1use agent_stream_kit::{
2 ASKit, AgentConfigs, AgentContext, AgentData, AgentError, AgentOutput, AgentValue, AsAgent,
3 async_trait,
4};
5use askit_macros::askit_agent;
6
7static CATEGORY: &str = "Std/Stream";
8
9static PIN_IN1: &str = "in1";
10static PIN_IN2: &str = "in2";
11static PIN_IN3: &str = "in3";
12static PIN_IN4: &str = "in4";
13static PIN_OUT1: &str = "out1";
14static PIN_OUT2: &str = "out2";
15static PIN_OUT3: &str = "out3";
16static PIN_OUT4: &str = "out4";
17
18struct SyncAgent {
20 n: usize,
21 in_ports: Vec<String>,
22 input_values: Vec<Option<AgentValue>>,
23 current_id: usize,
24}
25
26impl SyncAgent {
27 fn new_with_n(n: usize) -> Self {
28 let in_ports = (0..n).map(|i| format!("in{}", i + 1)).collect();
29 let input_values = vec![None; n];
30 Self {
31 n,
32 in_ports,
33 input_values,
34 current_id: 0,
35 }
36 }
37
38 async fn process_impl(
39 &mut self,
40 ctx: AgentContext,
41 pin: String,
42 value: AgentValue,
43 ) -> Result<Vec<AgentValue>, AgentError> {
44 let ctx_id = ctx.id();
46 if ctx_id != self.current_id {
47 self.current_id = ctx_id;
48 for slot in &mut self.input_values {
49 *slot = None;
50 }
51 }
52
53 for i in 0..self.n {
55 if pin == self.in_ports[i] {
56 self.input_values[i] = Some(value.clone());
57 }
58 }
59
60 if self.input_values.iter().any(|v| v.is_none()) {
62 return Ok(Vec::new());
63 }
64
65 let mut outputs = Vec::new();
67 for i in 0..self.n {
68 let out_value = self.input_values[i].take().unwrap();
69 outputs.push(out_value);
70 }
71
72 Ok(outputs)
73 }
74}
75
76#[askit_agent(
77 title = "Sync2",
78 category = CATEGORY,
79 inputs = [PIN_IN1, PIN_IN2],
80 outputs = [PIN_OUT1, PIN_OUT2],
81)]
82struct Sync2Agent {
83 data: AgentData,
84 inner: SyncAgent,
85}
86
87#[async_trait]
88impl AsAgent for Sync2Agent {
89 fn new(
90 askit: ASKit,
91 id: String,
92 def_name: String,
93 config: Option<AgentConfigs>,
94 ) -> Result<Self, AgentError> {
95 let data = AgentData::new(askit, id, def_name, config);
96 let inner = SyncAgent::new_with_n(2);
97 Ok(Self { data, inner })
98 }
99
100 async fn process(
101 &mut self,
102 ctx: AgentContext,
103 pin: String,
104 value: AgentValue,
105 ) -> Result<(), AgentError> {
106 let out = self.inner.process_impl(ctx.clone(), pin, value).await?;
107 if out.len() != 2 {
108 return Ok(());
109 }
110 self.try_output(ctx.clone(), PIN_OUT1, out[0].clone())?;
111 self.try_output(ctx, PIN_OUT2, out[1].clone())?;
112
113 Ok(())
114 }
115}
116
117#[askit_agent(
118 title = "Sync3",
119 category = CATEGORY,
120 inputs = [PIN_IN1, PIN_IN2, PIN_IN3],
121 outputs = [PIN_OUT1, PIN_OUT2, PIN_OUT3],
122)]
123struct Sync3Agent {
124 data: AgentData,
125 inner: SyncAgent,
126}
127
128#[async_trait]
129impl AsAgent for Sync3Agent {
130 fn new(
131 askit: ASKit,
132 id: String,
133 def_name: String,
134 config: Option<AgentConfigs>,
135 ) -> Result<Self, AgentError> {
136 let data = AgentData::new(askit, id, def_name, config);
137 let inner = SyncAgent::new_with_n(3);
138 Ok(Self { data, inner })
139 }
140
141 async fn process(
142 &mut self,
143 ctx: AgentContext,
144 pin: String,
145 value: AgentValue,
146 ) -> Result<(), AgentError> {
147 let out = self.inner.process_impl(ctx.clone(), pin, value).await?;
148 if out.len() != 3 {
149 return Ok(());
150 }
151 self.try_output(ctx.clone(), PIN_OUT1, out[0].clone())?;
152 self.try_output(ctx.clone(), PIN_OUT2, out[1].clone())?;
153 self.try_output(ctx, PIN_OUT3, out[2].clone())?;
154
155 Ok(())
156 }
157}
158
159#[askit_agent(
160 title = "Sync4",
161 category = CATEGORY,
162 inputs = [PIN_IN1, PIN_IN2, PIN_IN3, PIN_IN4],
163 outputs = [PIN_OUT1, PIN_OUT2, PIN_OUT3, PIN_OUT4],
164)]
165struct Sync4Agent {
166 data: AgentData,
167 inner: SyncAgent,
168}
169
170#[async_trait]
171impl AsAgent for Sync4Agent {
172 fn new(
173 askit: ASKit,
174 id: String,
175 def_name: String,
176 config: Option<AgentConfigs>,
177 ) -> Result<Self, AgentError> {
178 let data = AgentData::new(askit, id, def_name, config);
179 let inner = SyncAgent::new_with_n(3);
180 Ok(Self { data, inner })
181 }
182
183 async fn process(
184 &mut self,
185 ctx: AgentContext,
186 pin: String,
187 value: AgentValue,
188 ) -> Result<(), AgentError> {
189 let out = self.inner.process_impl(ctx.clone(), pin, value).await?;
190 if out.len() != 4 {
191 return Ok(());
192 }
193 self.try_output(ctx.clone(), PIN_OUT1, out[0].clone())?;
194 self.try_output(ctx.clone(), PIN_OUT2, out[1].clone())?;
195 self.try_output(ctx.clone(), PIN_OUT3, out[2].clone())?;
196 self.try_output(ctx, PIN_OUT4, out[3].clone())?;
197
198 Ok(())
199 }
200}