1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4use std::vec;
5
6use agent_stream_kit::{
7 ASKit, Agent, AgentContext, AgentData, AgentError, AgentOutput, AgentSpec, AgentStatus,
8 AgentValue, AsAgent, askit_agent, async_trait,
9};
10use chrono::{DateTime, Local, Utc};
11use cron::Schedule;
12use log;
13use regex::Regex;
14use tokio::task::JoinHandle;
15
16static CATEGORY: &str = "Std/Time";
17
18static PIN_TIME: &str = "time";
19static PIN_UNIT: &str = "unit";
20
21static CONFIG_DELAY: &str = "delay";
22static CONFIG_MAX_NUM_DATA: &str = "max_num_data";
23static CONFIG_INTERVAL: &str = "interval";
24static CONFIG_SCHEDULE: &str = "schedule";
25static CONFIG_TIME: &str = "time";
26
27const DELAY_MS_DEFAULT: i64 = 1000; const MAX_NUM_DATA_DEFAULT: i64 = 10;
29static INTERVAL_DEFAULT: &str = "10s";
30static TIME_DEFAULT: &str = "1s";
31
32#[askit_agent(
34 title = "Delay",
35 description = "Delays output by a specified time",
36 category = CATEGORY,
37 inputs = ["*"],
38 outputs = ["*"],
39 integer_config(name = CONFIG_DELAY, default = DELAY_MS_DEFAULT, title = "delay (ms)"),
40 integer_config(name = CONFIG_MAX_NUM_DATA, default = MAX_NUM_DATA_DEFAULT, title = "max num data")
41)]
42struct DelayAgent {
43 data: AgentData,
44 num_waiting_data: Arc<Mutex<i64>>,
45}
46
47#[async_trait]
48impl AsAgent for DelayAgent {
49 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
50 Ok(Self {
51 data: AgentData::new(askit, id, spec),
52 num_waiting_data: Arc::new(Mutex::new(0)),
53 })
54 }
55
56 async fn process(
57 &mut self,
58 ctx: AgentContext,
59 pin: String,
60 value: AgentValue,
61 ) -> Result<(), AgentError> {
62 let config = self.configs()?;
63 let delay_ms = config.get_integer_or(CONFIG_DELAY, DELAY_MS_DEFAULT);
64 let max_num_data = config.get_integer_or(CONFIG_MAX_NUM_DATA, MAX_NUM_DATA_DEFAULT);
65
66 {
68 let num_waiting_data = self.num_waiting_data.clone();
69 let mut num_waiting_data = num_waiting_data.lock().unwrap();
70 if *num_waiting_data >= max_num_data {
71 return Ok(());
72 }
73 *num_waiting_data += 1;
74 }
75
76 tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
77
78 self.try_output(ctx.clone(), pin, value.clone())?;
79
80 let mut num_waiting_data = self.num_waiting_data.lock().unwrap();
81 *num_waiting_data -= 1;
82
83 Ok(())
84 }
85}
86
87#[askit_agent(
89 title = "Interval Timer",
90 description = "Outputs a unit signal at specified intervals",
91 category = CATEGORY,
92 outputs = [PIN_UNIT],
93 string_config(name = CONFIG_INTERVAL, default = INTERVAL_DEFAULT, description = "(ex. 10s, 5m, 100ms, 1h, 1d)")
94)]
95struct IntervalTimerAgent {
96 data: AgentData,
97 timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
98 interval_ms: u64,
99}
100
101impl IntervalTimerAgent {
102 fn start_timer(&mut self) -> Result<(), AgentError> {
103 let timer_handle = self.timer_handle.clone();
104 let interval_ms = self.interval_ms;
105
106 let askit = self.askit().clone();
107 let agent_id = self.id().to_string();
108 let handle = self.runtime().spawn(async move {
109 loop {
110 tokio::time::sleep(tokio::time::Duration::from_millis(interval_ms)).await;
112
113 if let Ok(handle) = timer_handle.lock() {
115 if handle.is_none() {
116 break;
117 }
118 }
119
120 if let Err(e) = askit.try_send_agent_out(
122 agent_id.clone(),
123 AgentContext::new(),
124 PIN_UNIT.to_string(),
125 AgentValue::unit(),
126 ) {
127 log::error!("Failed to send interval timer output: {}", e);
128 }
129 }
130 });
131
132 if let Ok(mut timer_handle) = self.timer_handle.lock() {
134 *timer_handle = Some(handle);
135 }
136
137 Ok(())
138 }
139
140 fn stop_timer(&mut self) -> Result<(), AgentError> {
141 if let Ok(mut timer_handle) = self.timer_handle.lock() {
143 if let Some(handle) = timer_handle.take() {
144 handle.abort();
145 }
146 }
147 Ok(())
148 }
149}
150
151#[async_trait]
152impl AsAgent for IntervalTimerAgent {
153 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
154 let interval = spec
155 .configs
156 .as_ref()
157 .ok_or(AgentError::NoConfig)?
158 .get_string_or(CONFIG_INTERVAL, INTERVAL_DEFAULT);
159 let interval_ms = parse_duration_to_ms(&interval)?;
160
161 Ok(Self {
162 data: AgentData::new(askit, id, spec),
163 timer_handle: Default::default(),
164 interval_ms,
165 })
166 }
167
168 async fn start(&mut self) -> Result<(), AgentError> {
169 self.start_timer()
170 }
171
172 async fn stop(&mut self) -> Result<(), AgentError> {
173 self.stop_timer()
174 }
175
176 fn configs_changed(&mut self) -> Result<(), AgentError> {
177 let interval = self.configs()?.get_string(CONFIG_INTERVAL)?;
179 let new_interval = parse_duration_to_ms(&interval)?;
180 if new_interval != self.interval_ms {
181 self.interval_ms = new_interval;
182 if *self.status() == AgentStatus::Start {
183 self.stop_timer()?;
185 self.start_timer()?;
186 }
187 }
188 Ok(())
189 }
190}
191
192#[askit_agent(
194 title = "On Start",
195 category = CATEGORY,
196 outputs = [PIN_UNIT],
197 integer_config(name = CONFIG_DELAY, default = DELAY_MS_DEFAULT, title = "delay (ms)")
198)]
199struct OnStartAgent {
200 data: AgentData,
201}
202
203#[async_trait]
204impl AsAgent for OnStartAgent {
205 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
206 Ok(Self {
207 data: AgentData::new(askit, id, spec),
208 })
209 }
210
211 async fn start(&mut self) -> Result<(), AgentError> {
212 let config = self.configs()?;
213 let delay_ms = config.get_integer_or(CONFIG_DELAY, DELAY_MS_DEFAULT);
214
215 let askit = self.askit().clone();
216 let agent_id = self.id().to_string();
217
218 self.runtime().spawn(async move {
219 tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
220
221 if let Err(e) = askit.try_send_agent_out(
222 agent_id,
223 AgentContext::new(),
224 PIN_UNIT.to_string(),
225 AgentValue::unit(),
226 ) {
227 log::error!("Failed to send delayed output: {}", e);
228 }
229 });
230
231 Ok(())
232 }
233}
234
235#[askit_agent(
237 title = "Schedule Timer",
238 category = CATEGORY,
239 outputs = [PIN_TIME],
240 string_config(name = CONFIG_SCHEDULE, default = "0 0 * * * *", description = "sec min hour day month week year")
241)]
242struct ScheduleTimerAgent {
243 data: AgentData,
244 cron_schedule: Option<Schedule>,
245 timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
246}
247
248impl ScheduleTimerAgent {
249 fn start_timer(&mut self) -> Result<(), AgentError> {
250 let Some(schedule) = &self.cron_schedule else {
251 return Err(AgentError::InvalidConfig("No schedule defined".into()));
252 };
253
254 let askit = self.askit().clone();
255 let agent_id = self.id().to_string();
256 let timer_handle = self.timer_handle.clone();
257 let schedule = schedule.clone();
258
259 let handle = self.runtime().spawn(async move {
260 loop {
261 let now: DateTime<Utc> = Utc::now();
263 let next = match schedule.upcoming(Utc).next() {
264 Some(next_time) => next_time,
265 None => {
266 log::error!("No upcoming schedule times found");
267 break;
268 }
269 };
270
271 let duration = match (next - now).to_std() {
273 Ok(duration) => duration,
274 Err(e) => {
275 log::error!("Failed to calculate duration until next schedule: {}", e);
276 tokio::time::sleep(Duration::from_secs(60)).await;
278 continue;
279 }
280 };
281
282 let next_local = next.with_timezone(&Local);
283 log::debug!(
284 "Scheduling timer for '{}' to fire at {} (in {:?})",
285 agent_id,
286 next_local.format("%Y-%m-%d %H:%M:%S %z"),
287 duration
288 );
289
290 tokio::time::sleep(duration).await;
292
293 if let Ok(handle) = timer_handle.lock() {
295 if handle.is_none() {
296 break;
297 }
298 }
299
300 let current_local_time = Local::now().timestamp();
302
303 if let Err(e) = askit.try_send_agent_out(
305 agent_id.clone(),
306 AgentContext::new(),
307 PIN_TIME.to_string(),
308 AgentValue::integer(current_local_time),
309 ) {
310 log::error!("Failed to send schedule timer output: {}", e);
311 }
312 }
313 });
314
315 if let Ok(mut timer_handle) = self.timer_handle.lock() {
317 *timer_handle = Some(handle);
318 }
319
320 Ok(())
321 }
322
323 fn stop_timer(&mut self) -> Result<(), AgentError> {
324 if let Ok(mut timer_handle) = self.timer_handle.lock() {
326 if let Some(handle) = timer_handle.take() {
327 handle.abort();
328 }
329 }
330 Ok(())
331 }
332
333 fn parse_schedule(&mut self, schedule_str: &str) -> Result<(), AgentError> {
334 if schedule_str.trim().is_empty() {
335 self.cron_schedule = None;
336 return Ok(());
337 }
338
339 let schedule = Schedule::from_str(schedule_str).map_err(|e| {
340 AgentError::InvalidConfig(format!("Invalid cron schedule '{}': {}", schedule_str, e))
341 })?;
342 self.cron_schedule = Some(schedule);
343 Ok(())
344 }
345}
346
347#[async_trait]
348impl AsAgent for ScheduleTimerAgent {
349 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
350 let schedule_str = spec
351 .configs
352 .as_ref()
353 .map(|cfg| cfg.get_string(CONFIG_SCHEDULE))
354 .transpose()?;
355
356 let mut agent = Self {
357 data: AgentData::new(askit, id, spec),
358 cron_schedule: None,
359 timer_handle: Default::default(),
360 };
361
362 if let Some(schedule_str) = schedule_str {
363 if !schedule_str.is_empty() {
364 agent.parse_schedule(&schedule_str)?;
365 }
366 }
367
368 Ok(agent)
369 }
370
371 async fn start(&mut self) -> Result<(), AgentError> {
372 if self.cron_schedule.is_some() {
373 self.start_timer()?;
374 }
375 Ok(())
376 }
377
378 async fn stop(&mut self) -> Result<(), AgentError> {
379 self.stop_timer()
380 }
381
382 fn configs_changed(&mut self) -> Result<(), AgentError> {
383 let schedule_str = self.configs()?.get_string(CONFIG_SCHEDULE)?;
385 self.parse_schedule(&schedule_str)?;
386
387 if *self.status() == AgentStatus::Start {
388 self.stop_timer()?;
390 if self.cron_schedule.is_some() {
391 self.start_timer()?;
392 }
393 }
394 Ok(())
395 }
396}
397
398#[askit_agent(
400 title = "Throttle Time",
401 category = CATEGORY,
402 inputs = ["*"],
403 outputs = ["*"],
404 string_config(name = CONFIG_TIME, default = TIME_DEFAULT, description = "(ex. 10s, 5m, 100ms, 1h, 1d)"),
405 integer_config(name = CONFIG_MAX_NUM_DATA, title = "max num data", description = "0: no data, -1: all data")
406)]
407struct ThrottleTimeAgent {
408 data: AgentData,
409 timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
410 time_ms: u64,
411 max_num_data: i64,
412 waiting_data: Arc<Mutex<Vec<(AgentContext, String, AgentValue)>>>,
413}
414
415impl ThrottleTimeAgent {
416 fn start_timer(&mut self) -> Result<(), AgentError> {
417 let timer_handle = self.timer_handle.clone();
418 let time_ms = self.time_ms;
419
420 let waiting_data = self.waiting_data.clone();
421 let askit = self.askit().clone();
422 let agent_id = self.id().to_string();
423
424 let handle = self.runtime().spawn(async move {
425 loop {
426 tokio::time::sleep(tokio::time::Duration::from_millis(time_ms)).await;
428
429 let mut handle = timer_handle.lock().unwrap();
431 if handle.is_none() {
432 break;
433 }
434
435 let mut wd = waiting_data.lock().unwrap();
437 if wd.len() > 0 {
438 let (ctx, pin, data) = wd.remove(0);
440 askit
441 .try_send_agent_out(agent_id.clone(), ctx, pin, data)
442 .unwrap_or_else(|e| {
443 log::error!("Failed to send delayed output: {}", e);
444 });
445 }
446
447 if wd.len() == 0 {
449 handle.take();
450 break;
451 }
452 }
453 });
454
455 if let Ok(mut timer_handle) = self.timer_handle.lock() {
457 *timer_handle = Some(handle);
458 }
459
460 Ok(())
461 }
462
463 fn stop_timer(&mut self) -> Result<(), AgentError> {
464 if let Ok(mut timer_handle) = self.timer_handle.lock() {
466 if let Some(handle) = timer_handle.take() {
467 handle.abort();
468 }
469 }
470 Ok(())
471 }
472}
473
474#[async_trait]
475impl AsAgent for ThrottleTimeAgent {
476 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
477 let time = spec
478 .configs
479 .as_ref()
480 .ok_or(AgentError::NoConfig)?
481 .get_string_or(CONFIG_TIME, TIME_DEFAULT);
482 let time_ms = parse_duration_to_ms(&time)?;
483
484 let max_num_data = spec
485 .configs
486 .as_ref()
487 .ok_or(AgentError::NoConfig)?
488 .get_integer_or(CONFIG_MAX_NUM_DATA, 0);
489
490 Ok(Self {
491 data: AgentData::new(askit, id, spec),
492 timer_handle: Default::default(),
493 time_ms,
494 max_num_data,
495 waiting_data: Arc::new(Mutex::new(vec![])),
496 })
497 }
498
499 async fn stop(&mut self) -> Result<(), AgentError> {
500 self.stop_timer()
501 }
502
503 fn configs_changed(&mut self) -> Result<(), AgentError> {
504 let time = self.configs()?.get_string(CONFIG_TIME)?;
506 let new_time = parse_duration_to_ms(&time)?;
507 if new_time != self.time_ms {
508 self.time_ms = new_time;
509 }
510
511 let max_num_data = self.configs()?.get_integer(CONFIG_MAX_NUM_DATA)?;
513 if self.max_num_data != max_num_data {
514 let mut wd = self.waiting_data.lock().unwrap();
515 let wd_len = wd.len();
516 if max_num_data >= 0 && wd_len > (max_num_data as usize) {
517 wd.drain(0..(wd_len - (max_num_data as usize)));
519 }
520 self.max_num_data = max_num_data;
521 }
522 Ok(())
523 }
524
525 async fn process(
526 &mut self,
527 ctx: AgentContext,
528 pin: String,
529 value: AgentValue,
530 ) -> Result<(), AgentError> {
531 if self.timer_handle.lock().unwrap().is_some() {
532 let mut wd = self.waiting_data.lock().unwrap();
534
535 if self.max_num_data == 0 {
537 return Ok(());
538 }
539
540 wd.push((ctx, pin, value));
541 if self.max_num_data > 0 && wd.len() > self.max_num_data as usize {
542 wd.remove(0);
544 }
545
546 return Ok(());
547 }
548
549 self.start_timer()?;
551
552 self.try_output(ctx, pin, value)?;
554
555 Ok(())
556 }
557}
558
559fn parse_duration_to_ms(duration_str: &str) -> Result<u64, AgentError> {
561 const MIN_DURATION: u64 = 10;
562
563 let re = Regex::new(r"^(\d+)(?:([a-zA-Z]+))?$").expect("Failed to compile regex");
565
566 if let Some(captures) = re.captures(duration_str.trim()) {
567 let value: u64 = captures.get(1).unwrap().as_str().parse().map_err(|e| {
568 AgentError::InvalidConfig(format!(
569 "Invalid number in duration '{}': {}",
570 duration_str, e
571 ))
572 })?;
573
574 let unit = captures
576 .get(2)
577 .map_or("s".to_string(), |m| m.as_str().to_lowercase());
578
579 let milliseconds = match unit.as_str() {
581 "ms" => value, "s" => value * 1000, "m" => value * 60 * 1000, "h" => value * 3600 * 1000, "d" => value * 86400 * 1000, _ => {
587 return Err(AgentError::InvalidConfig(format!(
588 "Unknown time unit: {}",
589 unit
590 )));
591 }
592 };
593
594 Ok(std::cmp::max(milliseconds, MIN_DURATION))
596 } else {
597 let value: u64 = duration_str.parse().map_err(|e| {
600 AgentError::InvalidConfig(format!("Invalid duration format '{}': {}", duration_str, e))
601 })?;
602 Ok(std::cmp::max(value * 1000, MIN_DURATION)) }
604}