askit_std_agents/
time.rs

1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4use std::vec;
5
6use agent_stream_kit::{
7    ASKit, Agent, AgentConfigs, AgentContext, AgentError, AgentOutput, AgentStatus, AgentValue,
8    AsAgent, AgentData, async_trait,
9};
10use askit_macros::askit_agent;
11use chrono::{DateTime, Local, Utc};
12use cron::Schedule;
13use log;
14use regex::Regex;
15use tokio::task::JoinHandle;
16
17static CATEGORY: &str = "Std/Time";
18
19static PIN_TIME: &str = "time";
20static PIN_UNIT: &str = "unit";
21
22static CONFIG_DELAY: &str = "delay";
23static CONFIG_MAX_NUM_DATA: &str = "max_num_data";
24static CONFIG_INTERVAL: &str = "interval";
25static CONFIG_SCHEDULE: &str = "schedule";
26static CONFIG_TIME: &str = "time";
27
28const DELAY_MS_DEFAULT: i64 = 1000; // 1 second in milliseconds
29const MAX_NUM_DATA_DEFAULT: i64 = 10;
30static INTERVAL_DEFAULT: &str = "10s";
31static TIME_DEFAULT: &str = "1s";
32
33// Delay Agent
34#[askit_agent(
35    title = "Delay",
36    description = "Delays output by a specified time",
37    category = CATEGORY,
38    inputs = ["*"],
39    outputs = ["*"],
40    integer_config(name = CONFIG_DELAY, default = DELAY_MS_DEFAULT, title = "delay (ms)"),
41    integer_config(name = CONFIG_MAX_NUM_DATA, default = MAX_NUM_DATA_DEFAULT, title = "max num data")
42)]
43struct DelayAgent {
44    data: AgentData,
45    num_waiting_data: Arc<Mutex<i64>>,
46}
47
48#[async_trait]
49impl AsAgent for DelayAgent {
50    fn new(
51        askit: ASKit,
52        id: String,
53        def_name: String,
54        config: Option<AgentConfigs>,
55    ) -> Result<Self, AgentError> {
56        Ok(Self {
57            data: AgentData::new(askit, id, def_name, config),
58            num_waiting_data: Arc::new(Mutex::new(0)),
59        })
60    }
61
62    async fn process(
63        &mut self,
64        ctx: AgentContext,
65        pin: String,
66        value: AgentValue,
67    ) -> Result<(), AgentError> {
68        let config = self.configs()?;
69        let delay_ms = config.get_integer_or(CONFIG_DELAY, DELAY_MS_DEFAULT);
70        let max_num_data = config.get_integer_or(CONFIG_MAX_NUM_DATA, MAX_NUM_DATA_DEFAULT);
71
72        // To avoid generating too many timers
73        {
74            let num_waiting_data = self.num_waiting_data.clone();
75            let mut num_waiting_data = num_waiting_data.lock().unwrap();
76            if *num_waiting_data >= max_num_data {
77                return Ok(());
78            }
79            *num_waiting_data += 1;
80        }
81
82        tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
83
84        self.try_output(ctx.clone(), pin, value.clone())?;
85
86        let mut num_waiting_data = self.num_waiting_data.lock().unwrap();
87        *num_waiting_data -= 1;
88
89        Ok(())
90    }
91}
92
93// Interval Timer Agent
94#[askit_agent(
95    title = "Interval Timer",
96    description = "Outputs a unit signal at specified intervals",
97    category = CATEGORY,
98    outputs = [PIN_UNIT],
99    string_config(name = CONFIG_INTERVAL, default = INTERVAL_DEFAULT, description = "(ex. 10s, 5m, 100ms, 1h, 1d)")
100)]
101struct IntervalTimerAgent {
102    data: AgentData,
103    timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
104    interval_ms: u64,
105}
106
107impl IntervalTimerAgent {
108    fn start_timer(&mut self) -> Result<(), AgentError> {
109        let timer_handle = self.timer_handle.clone();
110        let interval_ms = self.interval_ms;
111
112        let askit = self.askit().clone();
113        let agent_id = self.id().to_string();
114        let handle = self.runtime().spawn(async move {
115            loop {
116                // Sleep for the configured interval
117                tokio::time::sleep(tokio::time::Duration::from_millis(interval_ms)).await;
118
119                // Check if we've been stopped
120                if let Ok(handle) = timer_handle.lock() {
121                    if handle.is_none() {
122                        break;
123                    }
124                }
125
126                // Create a unit output
127                if let Err(e) = askit.try_send_agent_out(
128                    agent_id.clone(),
129                    AgentContext::new(),
130                    PIN_UNIT.to_string(),
131                    AgentValue::unit(),
132                ) {
133                    log::error!("Failed to send interval timer output: {}", e);
134                }
135            }
136        });
137
138        // Store the timer handle
139        if let Ok(mut timer_handle) = self.timer_handle.lock() {
140            *timer_handle = Some(handle);
141        }
142
143        Ok(())
144    }
145
146    fn stop_timer(&mut self) -> Result<(), AgentError> {
147        // Cancel the timer
148        if let Ok(mut timer_handle) = self.timer_handle.lock() {
149            if let Some(handle) = timer_handle.take() {
150                handle.abort();
151            }
152        }
153        Ok(())
154    }
155}
156
157#[async_trait]
158impl AsAgent for IntervalTimerAgent {
159    fn new(
160        askit: ASKit,
161        id: String,
162        def_name: String,
163        config: Option<AgentConfigs>,
164    ) -> Result<Self, AgentError> {
165        let interval = config
166            .as_ref()
167            .ok_or(AgentError::NoConfig)?
168            .get_string_or(CONFIG_INTERVAL, INTERVAL_DEFAULT);
169        let interval_ms = parse_duration_to_ms(&interval)?;
170
171        Ok(Self {
172            data: AgentData::new(askit, id, def_name, config),
173            timer_handle: Default::default(),
174            interval_ms,
175        })
176    }
177
178    async fn start(&mut self) -> Result<(), AgentError> {
179        self.start_timer()
180    }
181
182    async fn stop(&mut self) -> Result<(), AgentError> {
183        self.stop_timer()
184    }
185
186    fn configs_changed(&mut self) -> Result<(), AgentError> {
187        // Check if interval has changed
188        let interval = self.configs()?.get_string(CONFIG_INTERVAL)?;
189        let new_interval = parse_duration_to_ms(&interval)?;
190        if new_interval != self.interval_ms {
191            self.interval_ms = new_interval;
192            if *self.status() == AgentStatus::Start {
193                // Restart the timer with the new interval
194                self.stop_timer()?;
195                self.start_timer()?;
196            }
197        }
198        Ok(())
199    }
200}
201
202// OnStart
203#[askit_agent(
204    title = "On Start",
205    category = CATEGORY,
206    outputs = [PIN_UNIT],
207    integer_config(name = CONFIG_DELAY, default = DELAY_MS_DEFAULT, title = "delay (ms)")
208)]
209struct OnStartAgent {
210    data: AgentData,
211}
212
213#[async_trait]
214impl AsAgent for OnStartAgent {
215    fn new(
216        askit: ASKit,
217        id: String,
218        def_name: String,
219        config: Option<AgentConfigs>,
220    ) -> Result<Self, AgentError> {
221        Ok(Self {
222            data: AgentData::new(askit, id, def_name, config),
223        })
224    }
225
226    async fn start(&mut self) -> Result<(), AgentError> {
227        let config = self.configs()?;
228        let delay_ms = config.get_integer_or(CONFIG_DELAY, DELAY_MS_DEFAULT);
229
230        let askit = self.askit().clone();
231        let agent_id = self.id().to_string();
232
233        self.runtime().spawn(async move {
234            tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
235
236            if let Err(e) = askit.try_send_agent_out(
237                agent_id,
238                AgentContext::new(),
239                PIN_UNIT.to_string(),
240                AgentValue::unit(),
241            ) {
242                log::error!("Failed to send delayed output: {}", e);
243            }
244        });
245
246        Ok(())
247    }
248}
249
250// Schedule Timer Agent
251#[askit_agent(
252    title = "Schedule Timer",
253    category = CATEGORY,
254    outputs = [PIN_TIME],
255    string_config(name = CONFIG_SCHEDULE, default = "0 0 * * * *", description = "sec min hour day month week year")
256)]
257struct ScheduleTimerAgent {
258    data: AgentData,
259    cron_schedule: Option<Schedule>,
260    timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
261}
262
263impl ScheduleTimerAgent {
264    fn start_timer(&mut self) -> Result<(), AgentError> {
265        let Some(schedule) = &self.cron_schedule else {
266            return Err(AgentError::InvalidConfig("No schedule defined".into()));
267        };
268
269        let askit = self.askit().clone();
270        let agent_id = self.id().to_string();
271        let timer_handle = self.timer_handle.clone();
272        let schedule = schedule.clone();
273
274        let handle = self.runtime().spawn(async move {
275            loop {
276                // Calculate the next time this schedule should run
277                let now: DateTime<Utc> = Utc::now();
278                let next = match schedule.upcoming(Utc).next() {
279                    Some(next_time) => next_time,
280                    None => {
281                        log::error!("No upcoming schedule times found");
282                        break;
283                    }
284                };
285
286                // Calculate the duration until the next scheduled time
287                let duration = match (next - now).to_std() {
288                    Ok(duration) => duration,
289                    Err(e) => {
290                        log::error!("Failed to calculate duration until next schedule: {}", e);
291                        // If we can't calculate the duration, sleep for a short time and try again
292                        tokio::time::sleep(Duration::from_secs(60)).await;
293                        continue;
294                    }
295                };
296
297                let next_local = next.with_timezone(&Local);
298                log::debug!(
299                    "Scheduling timer for '{}' to fire at {} (in {:?})",
300                    agent_id,
301                    next_local.format("%Y-%m-%d %H:%M:%S %z"),
302                    duration
303                );
304
305                // Sleep until the next scheduled time
306                tokio::time::sleep(duration).await;
307
308                // Check if we've been stopped
309                if let Ok(handle) = timer_handle.lock() {
310                    if handle.is_none() {
311                        break;
312                    }
313                }
314
315                // Get the current local timestamp (in seconds)
316                let current_local_time = Local::now().timestamp();
317
318                // Output the timestamp as an integer
319                if let Err(e) = askit.try_send_agent_out(
320                    agent_id.clone(),
321                    AgentContext::new(),
322                    PIN_TIME.to_string(),
323                    AgentValue::integer(current_local_time),
324                ) {
325                    log::error!("Failed to send schedule timer output: {}", e);
326                }
327            }
328        });
329
330        // Store the timer handle
331        if let Ok(mut timer_handle) = self.timer_handle.lock() {
332            *timer_handle = Some(handle);
333        }
334
335        Ok(())
336    }
337
338    fn stop_timer(&mut self) -> Result<(), AgentError> {
339        // Cancel the timer
340        if let Ok(mut timer_handle) = self.timer_handle.lock() {
341            if let Some(handle) = timer_handle.take() {
342                handle.abort();
343            }
344        }
345        Ok(())
346    }
347
348    fn parse_schedule(&mut self, schedule_str: &str) -> Result<(), AgentError> {
349        if schedule_str.trim().is_empty() {
350            self.cron_schedule = None;
351            return Ok(());
352        }
353
354        let schedule = Schedule::from_str(schedule_str).map_err(|e| {
355            AgentError::InvalidConfig(format!("Invalid cron schedule '{}': {}", schedule_str, e))
356        })?;
357        self.cron_schedule = Some(schedule);
358        Ok(())
359    }
360}
361
362#[async_trait]
363impl AsAgent for ScheduleTimerAgent {
364    fn new(
365        askit: ASKit,
366        id: String,
367        def_name: String,
368        config: Option<AgentConfigs>,
369    ) -> Result<Self, AgentError> {
370        let mut agent = Self {
371            data: AgentData::new(askit, id, def_name, config.clone()),
372            cron_schedule: None,
373            timer_handle: Default::default(),
374        };
375
376        if let Some(config) = config {
377            let schedule_str = config.get_string(CONFIG_SCHEDULE)?;
378            if !schedule_str.is_empty() {
379                agent.parse_schedule(&schedule_str)?;
380            }
381        }
382
383        Ok(agent)
384    }
385
386    async fn start(&mut self) -> Result<(), AgentError> {
387        if self.cron_schedule.is_some() {
388            self.start_timer()?;
389        }
390        Ok(())
391    }
392
393    async fn stop(&mut self) -> Result<(), AgentError> {
394        self.stop_timer()
395    }
396
397    fn configs_changed(&mut self) -> Result<(), AgentError> {
398        // Check if schedule has changed
399        let schedule_str = self.configs()?.get_string(CONFIG_SCHEDULE)?;
400        self.parse_schedule(&schedule_str)?;
401
402        if *self.status() == AgentStatus::Start {
403            // Restart the timer with the new schedule
404            self.stop_timer()?;
405            if self.cron_schedule.is_some() {
406                self.start_timer()?;
407            }
408        }
409        Ok(())
410    }
411}
412
413// Throttle agent
414#[askit_agent(
415    title = "Throttle Time",
416    category = CATEGORY,
417    inputs = ["*"],
418    outputs = ["*"],
419    string_config(name = CONFIG_TIME, default = TIME_DEFAULT, description = "(ex. 10s, 5m, 100ms, 1h, 1d)"),
420    integer_config(name = CONFIG_MAX_NUM_DATA, title = "max num data", description = "0: no data, -1: all data")
421)]
422struct ThrottleTimeAgent {
423    data: AgentData,
424    timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
425    time_ms: u64,
426    max_num_data: i64,
427    waiting_data: Arc<Mutex<Vec<(AgentContext, String, AgentValue)>>>,
428}
429
430impl ThrottleTimeAgent {
431    fn start_timer(&mut self) -> Result<(), AgentError> {
432        let timer_handle = self.timer_handle.clone();
433        let time_ms = self.time_ms;
434
435        let waiting_data = self.waiting_data.clone();
436        let askit = self.askit().clone();
437        let agent_id = self.id().to_string();
438
439        let handle = self.runtime().spawn(async move {
440            loop {
441                // Sleep for the configured interval
442                tokio::time::sleep(tokio::time::Duration::from_millis(time_ms)).await;
443
444                // Check if we've been stopped
445                let mut handle = timer_handle.lock().unwrap();
446                if handle.is_none() {
447                    break;
448                }
449
450                // process the waiting data
451                let mut wd = waiting_data.lock().unwrap();
452                if wd.len() > 0 {
453                    // If there are data waiting, output the first one
454                    let (ctx, pin, data) = wd.remove(0);
455                    askit
456                        .try_send_agent_out(agent_id.clone(), ctx, pin, data)
457                        .unwrap_or_else(|e| {
458                            log::error!("Failed to send delayed output: {}", e);
459                        });
460                }
461
462                // If there are no data waiting, we stop the timer
463                if wd.len() == 0 {
464                    handle.take();
465                    break;
466                }
467            }
468        });
469
470        // Store the timer handle
471        if let Ok(mut timer_handle) = self.timer_handle.lock() {
472            *timer_handle = Some(handle);
473        }
474
475        Ok(())
476    }
477
478    fn stop_timer(&mut self) -> Result<(), AgentError> {
479        // Cancel the timer
480        if let Ok(mut timer_handle) = self.timer_handle.lock() {
481            if let Some(handle) = timer_handle.take() {
482                handle.abort();
483            }
484        }
485        Ok(())
486    }
487}
488
489#[async_trait]
490impl AsAgent for ThrottleTimeAgent {
491    fn new(
492        askit: ASKit,
493        id: String,
494        def_name: String,
495        config: Option<AgentConfigs>,
496    ) -> Result<Self, AgentError> {
497        let time = config
498            .as_ref()
499            .ok_or(AgentError::NoConfig)?
500            .get_string_or(CONFIG_TIME, TIME_DEFAULT);
501        let time_ms = parse_duration_to_ms(&time)?;
502
503        let max_num_data = config
504            .as_ref()
505            .ok_or(AgentError::NoConfig)?
506            .get_integer_or(CONFIG_MAX_NUM_DATA, 0);
507
508        Ok(Self {
509            data: AgentData::new(askit, id, def_name, config),
510            timer_handle: Default::default(),
511            time_ms,
512            max_num_data,
513            waiting_data: Arc::new(Mutex::new(vec![])),
514        })
515    }
516
517    async fn stop(&mut self) -> Result<(), AgentError> {
518        self.stop_timer()
519    }
520
521    fn configs_changed(&mut self) -> Result<(), AgentError> {
522        // Check if interval has changed
523        let time = self.configs()?.get_string(CONFIG_TIME)?;
524        let new_time = parse_duration_to_ms(&time)?;
525        if new_time != self.time_ms {
526            self.time_ms = new_time;
527        }
528
529        // Check if max_num_data has changed
530        let max_num_data = self.configs()?.get_integer(CONFIG_MAX_NUM_DATA)?;
531        if self.max_num_data != max_num_data {
532            let mut wd = self.waiting_data.lock().unwrap();
533            let wd_len = wd.len();
534            if max_num_data >= 0 && wd_len > (max_num_data as usize) {
535                // If we have reached the max data to keep, we drop the oldest one
536                wd.drain(0..(wd_len - (max_num_data as usize)));
537            }
538            self.max_num_data = max_num_data;
539        }
540        Ok(())
541    }
542
543    async fn process(
544        &mut self,
545        ctx: AgentContext,
546        pin: String,
547        value: AgentValue,
548    ) -> Result<(), AgentError> {
549        if self.timer_handle.lock().unwrap().is_some() {
550            // If the timer is running, we just add the data to the waiting list
551            let mut wd = self.waiting_data.lock().unwrap();
552
553            // If max_num_data is 0, we don't need to keep any data
554            if self.max_num_data == 0 {
555                return Ok(());
556            }
557
558            wd.push((ctx, pin, value));
559            if self.max_num_data > 0 && wd.len() > self.max_num_data as usize {
560                // If we have reached the max data to keep, we drop the oldest one
561                wd.remove(0);
562            }
563
564            return Ok(());
565        }
566
567        // Start the timer
568        self.start_timer()?;
569
570        // Output the data
571        self.try_output(ctx, pin, value)?;
572
573        Ok(())
574    }
575}
576
577// Parse time duration strings like "2s", "10m", "200ms"
578fn parse_duration_to_ms(duration_str: &str) -> Result<u64, AgentError> {
579    const MIN_DURATION: u64 = 10;
580
581    // Regular expression to match number followed by optional unit
582    let re = Regex::new(r"^(\d+)(?:([a-zA-Z]+))?$").expect("Failed to compile regex");
583
584    if let Some(captures) = re.captures(duration_str.trim()) {
585        let value: u64 = captures.get(1).unwrap().as_str().parse().map_err(|e| {
586            AgentError::InvalidConfig(format!(
587                "Invalid number in duration '{}': {}",
588                duration_str, e
589            ))
590        })?;
591
592        // Get the unit if present, default to "s" (seconds)
593        let unit = captures
594            .get(2)
595            .map_or("s".to_string(), |m| m.as_str().to_lowercase());
596
597        // Convert to milliseconds based on unit
598        let milliseconds = match unit.as_str() {
599            "ms" => value,               // already in milliseconds
600            "s" => value * 1000,         // seconds to milliseconds
601            "m" => value * 60 * 1000,    // minutes to milliseconds
602            "h" => value * 3600 * 1000,  // hours to milliseconds
603            "d" => value * 86400 * 1000, // days to milliseconds
604            _ => {
605                return Err(AgentError::InvalidConfig(format!(
606                    "Unknown time unit: {}",
607                    unit
608                )));
609            }
610        };
611
612        // Ensure we don't return less than the minimum duration
613        Ok(std::cmp::max(milliseconds, MIN_DURATION))
614    } else {
615        // If the string doesn't match the pattern, try to parse it as a plain number
616        // and assume it's in seconds
617        let value: u64 = duration_str.parse().map_err(|e| {
618            AgentError::InvalidConfig(format!("Invalid duration format '{}': {}", duration_str, e))
619        })?;
620        Ok(std::cmp::max(value * 1000, MIN_DURATION)) // Convert to ms
621    }
622}