1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4use std::vec;
5
6use agent_stream_kit::{
7 ASKit, Agent, AgentConfigs, AgentContext, AgentError, AgentOutput, AgentStatus, AgentValue,
8 AsAgent, AgentData, async_trait,
9};
10use askit_macros::askit_agent;
11use chrono::{DateTime, Local, Utc};
12use cron::Schedule;
13use log;
14use regex::Regex;
15use tokio::task::JoinHandle;
16
17static CATEGORY: &str = "Std/Time";
18
19static PIN_TIME: &str = "time";
20static PIN_UNIT: &str = "unit";
21
22static CONFIG_DELAY: &str = "delay";
23static CONFIG_MAX_NUM_DATA: &str = "max_num_data";
24static CONFIG_INTERVAL: &str = "interval";
25static CONFIG_SCHEDULE: &str = "schedule";
26static CONFIG_TIME: &str = "time";
27
28const DELAY_MS_DEFAULT: i64 = 1000; const MAX_NUM_DATA_DEFAULT: i64 = 10;
30static INTERVAL_DEFAULT: &str = "10s";
31static TIME_DEFAULT: &str = "1s";
32
33#[askit_agent(
35 title = "Delay",
36 description = "Delays output by a specified time",
37 category = CATEGORY,
38 inputs = ["*"],
39 outputs = ["*"],
40 integer_config(name = CONFIG_DELAY, default = DELAY_MS_DEFAULT, title = "delay (ms)"),
41 integer_config(name = CONFIG_MAX_NUM_DATA, default = MAX_NUM_DATA_DEFAULT, title = "max num data")
42)]
43struct DelayAgent {
44 data: AgentData,
45 num_waiting_data: Arc<Mutex<i64>>,
46}
47
48#[async_trait]
49impl AsAgent for DelayAgent {
50 fn new(
51 askit: ASKit,
52 id: String,
53 def_name: String,
54 config: Option<AgentConfigs>,
55 ) -> Result<Self, AgentError> {
56 Ok(Self {
57 data: AgentData::new(askit, id, def_name, config),
58 num_waiting_data: Arc::new(Mutex::new(0)),
59 })
60 }
61
62 async fn process(
63 &mut self,
64 ctx: AgentContext,
65 pin: String,
66 value: AgentValue,
67 ) -> Result<(), AgentError> {
68 let config = self.configs()?;
69 let delay_ms = config.get_integer_or(CONFIG_DELAY, DELAY_MS_DEFAULT);
70 let max_num_data = config.get_integer_or(CONFIG_MAX_NUM_DATA, MAX_NUM_DATA_DEFAULT);
71
72 {
74 let num_waiting_data = self.num_waiting_data.clone();
75 let mut num_waiting_data = num_waiting_data.lock().unwrap();
76 if *num_waiting_data >= max_num_data {
77 return Ok(());
78 }
79 *num_waiting_data += 1;
80 }
81
82 tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
83
84 self.try_output(ctx.clone(), pin, value.clone())?;
85
86 let mut num_waiting_data = self.num_waiting_data.lock().unwrap();
87 *num_waiting_data -= 1;
88
89 Ok(())
90 }
91}
92
93#[askit_agent(
95 title = "Interval Timer",
96 description = "Outputs a unit signal at specified intervals",
97 category = CATEGORY,
98 outputs = [PIN_UNIT],
99 string_config(name = CONFIG_INTERVAL, default = INTERVAL_DEFAULT, description = "(ex. 10s, 5m, 100ms, 1h, 1d)")
100)]
101struct IntervalTimerAgent {
102 data: AgentData,
103 timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
104 interval_ms: u64,
105}
106
107impl IntervalTimerAgent {
108 fn start_timer(&mut self) -> Result<(), AgentError> {
109 let timer_handle = self.timer_handle.clone();
110 let interval_ms = self.interval_ms;
111
112 let askit = self.askit().clone();
113 let agent_id = self.id().to_string();
114 let handle = self.runtime().spawn(async move {
115 loop {
116 tokio::time::sleep(tokio::time::Duration::from_millis(interval_ms)).await;
118
119 if let Ok(handle) = timer_handle.lock() {
121 if handle.is_none() {
122 break;
123 }
124 }
125
126 if let Err(e) = askit.try_send_agent_out(
128 agent_id.clone(),
129 AgentContext::new(),
130 PIN_UNIT.to_string(),
131 AgentValue::unit(),
132 ) {
133 log::error!("Failed to send interval timer output: {}", e);
134 }
135 }
136 });
137
138 if let Ok(mut timer_handle) = self.timer_handle.lock() {
140 *timer_handle = Some(handle);
141 }
142
143 Ok(())
144 }
145
146 fn stop_timer(&mut self) -> Result<(), AgentError> {
147 if let Ok(mut timer_handle) = self.timer_handle.lock() {
149 if let Some(handle) = timer_handle.take() {
150 handle.abort();
151 }
152 }
153 Ok(())
154 }
155}
156
157#[async_trait]
158impl AsAgent for IntervalTimerAgent {
159 fn new(
160 askit: ASKit,
161 id: String,
162 def_name: String,
163 config: Option<AgentConfigs>,
164 ) -> Result<Self, AgentError> {
165 let interval = config
166 .as_ref()
167 .ok_or(AgentError::NoConfig)?
168 .get_string_or(CONFIG_INTERVAL, INTERVAL_DEFAULT);
169 let interval_ms = parse_duration_to_ms(&interval)?;
170
171 Ok(Self {
172 data: AgentData::new(askit, id, def_name, config),
173 timer_handle: Default::default(),
174 interval_ms,
175 })
176 }
177
178 async fn start(&mut self) -> Result<(), AgentError> {
179 self.start_timer()
180 }
181
182 async fn stop(&mut self) -> Result<(), AgentError> {
183 self.stop_timer()
184 }
185
186 fn configs_changed(&mut self) -> Result<(), AgentError> {
187 let interval = self.configs()?.get_string(CONFIG_INTERVAL)?;
189 let new_interval = parse_duration_to_ms(&interval)?;
190 if new_interval != self.interval_ms {
191 self.interval_ms = new_interval;
192 if *self.status() == AgentStatus::Start {
193 self.stop_timer()?;
195 self.start_timer()?;
196 }
197 }
198 Ok(())
199 }
200}
201
202#[askit_agent(
204 title = "On Start",
205 category = CATEGORY,
206 outputs = [PIN_UNIT],
207 integer_config(name = CONFIG_DELAY, default = DELAY_MS_DEFAULT, title = "delay (ms)")
208)]
209struct OnStartAgent {
210 data: AgentData,
211}
212
213#[async_trait]
214impl AsAgent for OnStartAgent {
215 fn new(
216 askit: ASKit,
217 id: String,
218 def_name: String,
219 config: Option<AgentConfigs>,
220 ) -> Result<Self, AgentError> {
221 Ok(Self {
222 data: AgentData::new(askit, id, def_name, config),
223 })
224 }
225
226 async fn start(&mut self) -> Result<(), AgentError> {
227 let config = self.configs()?;
228 let delay_ms = config.get_integer_or(CONFIG_DELAY, DELAY_MS_DEFAULT);
229
230 let askit = self.askit().clone();
231 let agent_id = self.id().to_string();
232
233 self.runtime().spawn(async move {
234 tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
235
236 if let Err(e) = askit.try_send_agent_out(
237 agent_id,
238 AgentContext::new(),
239 PIN_UNIT.to_string(),
240 AgentValue::unit(),
241 ) {
242 log::error!("Failed to send delayed output: {}", e);
243 }
244 });
245
246 Ok(())
247 }
248}
249
250#[askit_agent(
252 title = "Schedule Timer",
253 category = CATEGORY,
254 outputs = [PIN_TIME],
255 string_config(name = CONFIG_SCHEDULE, default = "0 0 * * * *", description = "sec min hour day month week year")
256)]
257struct ScheduleTimerAgent {
258 data: AgentData,
259 cron_schedule: Option<Schedule>,
260 timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
261}
262
263impl ScheduleTimerAgent {
264 fn start_timer(&mut self) -> Result<(), AgentError> {
265 let Some(schedule) = &self.cron_schedule else {
266 return Err(AgentError::InvalidConfig("No schedule defined".into()));
267 };
268
269 let askit = self.askit().clone();
270 let agent_id = self.id().to_string();
271 let timer_handle = self.timer_handle.clone();
272 let schedule = schedule.clone();
273
274 let handle = self.runtime().spawn(async move {
275 loop {
276 let now: DateTime<Utc> = Utc::now();
278 let next = match schedule.upcoming(Utc).next() {
279 Some(next_time) => next_time,
280 None => {
281 log::error!("No upcoming schedule times found");
282 break;
283 }
284 };
285
286 let duration = match (next - now).to_std() {
288 Ok(duration) => duration,
289 Err(e) => {
290 log::error!("Failed to calculate duration until next schedule: {}", e);
291 tokio::time::sleep(Duration::from_secs(60)).await;
293 continue;
294 }
295 };
296
297 let next_local = next.with_timezone(&Local);
298 log::debug!(
299 "Scheduling timer for '{}' to fire at {} (in {:?})",
300 agent_id,
301 next_local.format("%Y-%m-%d %H:%M:%S %z"),
302 duration
303 );
304
305 tokio::time::sleep(duration).await;
307
308 if let Ok(handle) = timer_handle.lock() {
310 if handle.is_none() {
311 break;
312 }
313 }
314
315 let current_local_time = Local::now().timestamp();
317
318 if let Err(e) = askit.try_send_agent_out(
320 agent_id.clone(),
321 AgentContext::new(),
322 PIN_TIME.to_string(),
323 AgentValue::integer(current_local_time),
324 ) {
325 log::error!("Failed to send schedule timer output: {}", e);
326 }
327 }
328 });
329
330 if let Ok(mut timer_handle) = self.timer_handle.lock() {
332 *timer_handle = Some(handle);
333 }
334
335 Ok(())
336 }
337
338 fn stop_timer(&mut self) -> Result<(), AgentError> {
339 if let Ok(mut timer_handle) = self.timer_handle.lock() {
341 if let Some(handle) = timer_handle.take() {
342 handle.abort();
343 }
344 }
345 Ok(())
346 }
347
348 fn parse_schedule(&mut self, schedule_str: &str) -> Result<(), AgentError> {
349 if schedule_str.trim().is_empty() {
350 self.cron_schedule = None;
351 return Ok(());
352 }
353
354 let schedule = Schedule::from_str(schedule_str).map_err(|e| {
355 AgentError::InvalidConfig(format!("Invalid cron schedule '{}': {}", schedule_str, e))
356 })?;
357 self.cron_schedule = Some(schedule);
358 Ok(())
359 }
360}
361
362#[async_trait]
363impl AsAgent for ScheduleTimerAgent {
364 fn new(
365 askit: ASKit,
366 id: String,
367 def_name: String,
368 config: Option<AgentConfigs>,
369 ) -> Result<Self, AgentError> {
370 let mut agent = Self {
371 data: AgentData::new(askit, id, def_name, config.clone()),
372 cron_schedule: None,
373 timer_handle: Default::default(),
374 };
375
376 if let Some(config) = config {
377 let schedule_str = config.get_string(CONFIG_SCHEDULE)?;
378 if !schedule_str.is_empty() {
379 agent.parse_schedule(&schedule_str)?;
380 }
381 }
382
383 Ok(agent)
384 }
385
386 async fn start(&mut self) -> Result<(), AgentError> {
387 if self.cron_schedule.is_some() {
388 self.start_timer()?;
389 }
390 Ok(())
391 }
392
393 async fn stop(&mut self) -> Result<(), AgentError> {
394 self.stop_timer()
395 }
396
397 fn configs_changed(&mut self) -> Result<(), AgentError> {
398 let schedule_str = self.configs()?.get_string(CONFIG_SCHEDULE)?;
400 self.parse_schedule(&schedule_str)?;
401
402 if *self.status() == AgentStatus::Start {
403 self.stop_timer()?;
405 if self.cron_schedule.is_some() {
406 self.start_timer()?;
407 }
408 }
409 Ok(())
410 }
411}
412
413#[askit_agent(
415 title = "Throttle Time",
416 category = CATEGORY,
417 inputs = ["*"],
418 outputs = ["*"],
419 string_config(name = CONFIG_TIME, default = TIME_DEFAULT, description = "(ex. 10s, 5m, 100ms, 1h, 1d)"),
420 integer_config(name = CONFIG_MAX_NUM_DATA, title = "max num data", description = "0: no data, -1: all data")
421)]
422struct ThrottleTimeAgent {
423 data: AgentData,
424 timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
425 time_ms: u64,
426 max_num_data: i64,
427 waiting_data: Arc<Mutex<Vec<(AgentContext, String, AgentValue)>>>,
428}
429
430impl ThrottleTimeAgent {
431 fn start_timer(&mut self) -> Result<(), AgentError> {
432 let timer_handle = self.timer_handle.clone();
433 let time_ms = self.time_ms;
434
435 let waiting_data = self.waiting_data.clone();
436 let askit = self.askit().clone();
437 let agent_id = self.id().to_string();
438
439 let handle = self.runtime().spawn(async move {
440 loop {
441 tokio::time::sleep(tokio::time::Duration::from_millis(time_ms)).await;
443
444 let mut handle = timer_handle.lock().unwrap();
446 if handle.is_none() {
447 break;
448 }
449
450 let mut wd = waiting_data.lock().unwrap();
452 if wd.len() > 0 {
453 let (ctx, pin, data) = wd.remove(0);
455 askit
456 .try_send_agent_out(agent_id.clone(), ctx, pin, data)
457 .unwrap_or_else(|e| {
458 log::error!("Failed to send delayed output: {}", e);
459 });
460 }
461
462 if wd.len() == 0 {
464 handle.take();
465 break;
466 }
467 }
468 });
469
470 if let Ok(mut timer_handle) = self.timer_handle.lock() {
472 *timer_handle = Some(handle);
473 }
474
475 Ok(())
476 }
477
478 fn stop_timer(&mut self) -> Result<(), AgentError> {
479 if let Ok(mut timer_handle) = self.timer_handle.lock() {
481 if let Some(handle) = timer_handle.take() {
482 handle.abort();
483 }
484 }
485 Ok(())
486 }
487}
488
489#[async_trait]
490impl AsAgent for ThrottleTimeAgent {
491 fn new(
492 askit: ASKit,
493 id: String,
494 def_name: String,
495 config: Option<AgentConfigs>,
496 ) -> Result<Self, AgentError> {
497 let time = config
498 .as_ref()
499 .ok_or(AgentError::NoConfig)?
500 .get_string_or(CONFIG_TIME, TIME_DEFAULT);
501 let time_ms = parse_duration_to_ms(&time)?;
502
503 let max_num_data = config
504 .as_ref()
505 .ok_or(AgentError::NoConfig)?
506 .get_integer_or(CONFIG_MAX_NUM_DATA, 0);
507
508 Ok(Self {
509 data: AgentData::new(askit, id, def_name, config),
510 timer_handle: Default::default(),
511 time_ms,
512 max_num_data,
513 waiting_data: Arc::new(Mutex::new(vec![])),
514 })
515 }
516
517 async fn stop(&mut self) -> Result<(), AgentError> {
518 self.stop_timer()
519 }
520
521 fn configs_changed(&mut self) -> Result<(), AgentError> {
522 let time = self.configs()?.get_string(CONFIG_TIME)?;
524 let new_time = parse_duration_to_ms(&time)?;
525 if new_time != self.time_ms {
526 self.time_ms = new_time;
527 }
528
529 let max_num_data = self.configs()?.get_integer(CONFIG_MAX_NUM_DATA)?;
531 if self.max_num_data != max_num_data {
532 let mut wd = self.waiting_data.lock().unwrap();
533 let wd_len = wd.len();
534 if max_num_data >= 0 && wd_len > (max_num_data as usize) {
535 wd.drain(0..(wd_len - (max_num_data as usize)));
537 }
538 self.max_num_data = max_num_data;
539 }
540 Ok(())
541 }
542
543 async fn process(
544 &mut self,
545 ctx: AgentContext,
546 pin: String,
547 value: AgentValue,
548 ) -> Result<(), AgentError> {
549 if self.timer_handle.lock().unwrap().is_some() {
550 let mut wd = self.waiting_data.lock().unwrap();
552
553 if self.max_num_data == 0 {
555 return Ok(());
556 }
557
558 wd.push((ctx, pin, value));
559 if self.max_num_data > 0 && wd.len() > self.max_num_data as usize {
560 wd.remove(0);
562 }
563
564 return Ok(());
565 }
566
567 self.start_timer()?;
569
570 self.try_output(ctx, pin, value)?;
572
573 Ok(())
574 }
575}
576
577fn parse_duration_to_ms(duration_str: &str) -> Result<u64, AgentError> {
579 const MIN_DURATION: u64 = 10;
580
581 let re = Regex::new(r"^(\d+)(?:([a-zA-Z]+))?$").expect("Failed to compile regex");
583
584 if let Some(captures) = re.captures(duration_str.trim()) {
585 let value: u64 = captures.get(1).unwrap().as_str().parse().map_err(|e| {
586 AgentError::InvalidConfig(format!(
587 "Invalid number in duration '{}': {}",
588 duration_str, e
589 ))
590 })?;
591
592 let unit = captures
594 .get(2)
595 .map_or("s".to_string(), |m| m.as_str().to_lowercase());
596
597 let milliseconds = match unit.as_str() {
599 "ms" => value, "s" => value * 1000, "m" => value * 60 * 1000, "h" => value * 3600 * 1000, "d" => value * 86400 * 1000, _ => {
605 return Err(AgentError::InvalidConfig(format!(
606 "Unknown time unit: {}",
607 unit
608 )));
609 }
610 };
611
612 Ok(std::cmp::max(milliseconds, MIN_DURATION))
614 } else {
615 let value: u64 = duration_str.parse().map_err(|e| {
618 AgentError::InvalidConfig(format!("Invalid duration format '{}': {}", duration_str, e))
619 })?;
620 Ok(std::cmp::max(value * 1000, MIN_DURATION)) }
622}