askit_std_agents/
time.rs

1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4use std::vec;
5
6use agent_stream_kit::{
7    ASKit, Agent, AgentContext, AgentData, AgentError, AgentOutput, AgentSpec, AgentStatus,
8    AgentValue, AsAgent, askit_agent, async_trait,
9};
10use chrono::{DateTime, Local, Utc};
11use cron::Schedule;
12use log;
13use regex::Regex;
14use tokio::task::JoinHandle;
15
16static CATEGORY: &str = "Std/Time";
17
18static PIN_TIME: &str = "time";
19static PIN_UNIT: &str = "unit";
20
21static CONFIG_DELAY: &str = "delay";
22static CONFIG_MAX_NUM_DATA: &str = "max_num_data";
23static CONFIG_INTERVAL: &str = "interval";
24static CONFIG_SCHEDULE: &str = "schedule";
25static CONFIG_TIME: &str = "time";
26
27const DELAY_MS_DEFAULT: i64 = 1000; // 1 second in milliseconds
28const MAX_NUM_DATA_DEFAULT: i64 = 10;
29static INTERVAL_DEFAULT: &str = "10s";
30static TIME_DEFAULT: &str = "1s";
31
32// Delay Agent
33#[askit_agent(
34    title = "Delay",
35    description = "Delays output by a specified time",
36    category = CATEGORY,
37    inputs = ["*"],
38    outputs = ["*"],
39    integer_config(name = CONFIG_DELAY, default = DELAY_MS_DEFAULT, title = "delay (ms)"),
40    integer_config(name = CONFIG_MAX_NUM_DATA, default = MAX_NUM_DATA_DEFAULT, title = "max num data")
41)]
42struct DelayAgent {
43    data: AgentData,
44    num_waiting_data: Arc<Mutex<i64>>,
45}
46
47#[async_trait]
48impl AsAgent for DelayAgent {
49    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
50        Ok(Self {
51            data: AgentData::new(askit, id, spec),
52            num_waiting_data: Arc::new(Mutex::new(0)),
53        })
54    }
55
56    async fn process(
57        &mut self,
58        ctx: AgentContext,
59        pin: String,
60        value: AgentValue,
61    ) -> Result<(), AgentError> {
62        let config = self.configs()?;
63        let delay_ms = config.get_integer_or(CONFIG_DELAY, DELAY_MS_DEFAULT);
64        let max_num_data = config.get_integer_or(CONFIG_MAX_NUM_DATA, MAX_NUM_DATA_DEFAULT);
65
66        // To avoid generating too many timers
67        {
68            let num_waiting_data = self.num_waiting_data.clone();
69            let mut num_waiting_data = num_waiting_data.lock().unwrap();
70            if *num_waiting_data >= max_num_data {
71                return Ok(());
72            }
73            *num_waiting_data += 1;
74        }
75
76        tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
77
78        self.try_output(ctx.clone(), pin, value.clone())?;
79
80        let mut num_waiting_data = self.num_waiting_data.lock().unwrap();
81        *num_waiting_data -= 1;
82
83        Ok(())
84    }
85}
86
87// Interval Timer Agent
88#[askit_agent(
89    title = "Interval Timer",
90    description = "Outputs a unit signal at specified intervals",
91    category = CATEGORY,
92    outputs = [PIN_UNIT],
93    string_config(name = CONFIG_INTERVAL, default = INTERVAL_DEFAULT, description = "(ex. 10s, 5m, 100ms, 1h, 1d)")
94)]
95struct IntervalTimerAgent {
96    data: AgentData,
97    timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
98    interval_ms: u64,
99}
100
101impl IntervalTimerAgent {
102    fn start_timer(&mut self) -> Result<(), AgentError> {
103        let timer_handle = self.timer_handle.clone();
104        let interval_ms = self.interval_ms;
105
106        let askit = self.askit().clone();
107        let agent_id = self.id().to_string();
108        let handle = self.runtime().spawn(async move {
109            loop {
110                // Sleep for the configured interval
111                tokio::time::sleep(tokio::time::Duration::from_millis(interval_ms)).await;
112
113                // Check if we've been stopped
114                if let Ok(handle) = timer_handle.lock() {
115                    if handle.is_none() {
116                        break;
117                    }
118                }
119
120                // Create a unit output
121                if let Err(e) = askit.try_send_agent_out(
122                    agent_id.clone(),
123                    AgentContext::new(),
124                    PIN_UNIT.to_string(),
125                    AgentValue::unit(),
126                ) {
127                    log::error!("Failed to send interval timer output: {}", e);
128                }
129            }
130        });
131
132        // Store the timer handle
133        if let Ok(mut timer_handle) = self.timer_handle.lock() {
134            *timer_handle = Some(handle);
135        }
136
137        Ok(())
138    }
139
140    fn stop_timer(&mut self) -> Result<(), AgentError> {
141        // Cancel the timer
142        if let Ok(mut timer_handle) = self.timer_handle.lock() {
143            if let Some(handle) = timer_handle.take() {
144                handle.abort();
145            }
146        }
147        Ok(())
148    }
149}
150
151#[async_trait]
152impl AsAgent for IntervalTimerAgent {
153    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
154        let interval = spec
155            .configs
156            .as_ref()
157            .ok_or(AgentError::NoConfig)?
158            .get_string_or(CONFIG_INTERVAL, INTERVAL_DEFAULT);
159        let interval_ms = parse_duration_to_ms(&interval)?;
160
161        Ok(Self {
162            data: AgentData::new(askit, id, spec),
163            timer_handle: Default::default(),
164            interval_ms,
165        })
166    }
167
168    async fn start(&mut self) -> Result<(), AgentError> {
169        self.start_timer()
170    }
171
172    async fn stop(&mut self) -> Result<(), AgentError> {
173        self.stop_timer()
174    }
175
176    fn configs_changed(&mut self) -> Result<(), AgentError> {
177        // Check if interval has changed
178        let interval = self.configs()?.get_string(CONFIG_INTERVAL)?;
179        let new_interval = parse_duration_to_ms(&interval)?;
180        if new_interval != self.interval_ms {
181            self.interval_ms = new_interval;
182            if *self.status() == AgentStatus::Start {
183                // Restart the timer with the new interval
184                self.stop_timer()?;
185                self.start_timer()?;
186            }
187        }
188        Ok(())
189    }
190}
191
192// OnStart
193#[askit_agent(
194    title = "On Start",
195    category = CATEGORY,
196    outputs = [PIN_UNIT],
197    integer_config(name = CONFIG_DELAY, default = DELAY_MS_DEFAULT, title = "delay (ms)")
198)]
199struct OnStartAgent {
200    data: AgentData,
201}
202
203#[async_trait]
204impl AsAgent for OnStartAgent {
205    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
206        Ok(Self {
207            data: AgentData::new(askit, id, spec),
208        })
209    }
210
211    async fn start(&mut self) -> Result<(), AgentError> {
212        let config = self.configs()?;
213        let delay_ms = config.get_integer_or(CONFIG_DELAY, DELAY_MS_DEFAULT);
214
215        let askit = self.askit().clone();
216        let agent_id = self.id().to_string();
217
218        self.runtime().spawn(async move {
219            tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
220
221            if let Err(e) = askit.try_send_agent_out(
222                agent_id,
223                AgentContext::new(),
224                PIN_UNIT.to_string(),
225                AgentValue::unit(),
226            ) {
227                log::error!("Failed to send delayed output: {}", e);
228            }
229        });
230
231        Ok(())
232    }
233}
234
235// Schedule Timer Agent
236#[askit_agent(
237    title = "Schedule Timer",
238    category = CATEGORY,
239    outputs = [PIN_TIME],
240    string_config(name = CONFIG_SCHEDULE, default = "0 0 * * * *", description = "sec min hour day month week year")
241)]
242struct ScheduleTimerAgent {
243    data: AgentData,
244    cron_schedule: Option<Schedule>,
245    timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
246}
247
248impl ScheduleTimerAgent {
249    fn start_timer(&mut self) -> Result<(), AgentError> {
250        let Some(schedule) = &self.cron_schedule else {
251            return Err(AgentError::InvalidConfig("No schedule defined".into()));
252        };
253
254        let askit = self.askit().clone();
255        let agent_id = self.id().to_string();
256        let timer_handle = self.timer_handle.clone();
257        let schedule = schedule.clone();
258
259        let handle = self.runtime().spawn(async move {
260            loop {
261                // Calculate the next time this schedule should run
262                let now: DateTime<Utc> = Utc::now();
263                let next = match schedule.upcoming(Utc).next() {
264                    Some(next_time) => next_time,
265                    None => {
266                        log::error!("No upcoming schedule times found");
267                        break;
268                    }
269                };
270
271                // Calculate the duration until the next scheduled time
272                let duration = match (next - now).to_std() {
273                    Ok(duration) => duration,
274                    Err(e) => {
275                        log::error!("Failed to calculate duration until next schedule: {}", e);
276                        // If we can't calculate the duration, sleep for a short time and try again
277                        tokio::time::sleep(Duration::from_secs(60)).await;
278                        continue;
279                    }
280                };
281
282                let next_local = next.with_timezone(&Local);
283                log::debug!(
284                    "Scheduling timer for '{}' to fire at {} (in {:?})",
285                    agent_id,
286                    next_local.format("%Y-%m-%d %H:%M:%S %z"),
287                    duration
288                );
289
290                // Sleep until the next scheduled time
291                tokio::time::sleep(duration).await;
292
293                // Check if we've been stopped
294                if let Ok(handle) = timer_handle.lock() {
295                    if handle.is_none() {
296                        break;
297                    }
298                }
299
300                // Get the current local timestamp (in seconds)
301                let current_local_time = Local::now().timestamp();
302
303                // Output the timestamp as an integer
304                if let Err(e) = askit.try_send_agent_out(
305                    agent_id.clone(),
306                    AgentContext::new(),
307                    PIN_TIME.to_string(),
308                    AgentValue::integer(current_local_time),
309                ) {
310                    log::error!("Failed to send schedule timer output: {}", e);
311                }
312            }
313        });
314
315        // Store the timer handle
316        if let Ok(mut timer_handle) = self.timer_handle.lock() {
317            *timer_handle = Some(handle);
318        }
319
320        Ok(())
321    }
322
323    fn stop_timer(&mut self) -> Result<(), AgentError> {
324        // Cancel the timer
325        if let Ok(mut timer_handle) = self.timer_handle.lock() {
326            if let Some(handle) = timer_handle.take() {
327                handle.abort();
328            }
329        }
330        Ok(())
331    }
332
333    fn parse_schedule(&mut self, schedule_str: &str) -> Result<(), AgentError> {
334        if schedule_str.trim().is_empty() {
335            self.cron_schedule = None;
336            return Ok(());
337        }
338
339        let schedule = Schedule::from_str(schedule_str).map_err(|e| {
340            AgentError::InvalidConfig(format!("Invalid cron schedule '{}': {}", schedule_str, e))
341        })?;
342        self.cron_schedule = Some(schedule);
343        Ok(())
344    }
345}
346
347#[async_trait]
348impl AsAgent for ScheduleTimerAgent {
349    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
350        let schedule_str = spec
351            .configs
352            .as_ref()
353            .map(|cfg| cfg.get_string(CONFIG_SCHEDULE))
354            .transpose()?;
355
356        let mut agent = Self {
357            data: AgentData::new(askit, id, spec),
358            cron_schedule: None,
359            timer_handle: Default::default(),
360        };
361
362        if let Some(schedule_str) = schedule_str {
363            if !schedule_str.is_empty() {
364                agent.parse_schedule(&schedule_str)?;
365            }
366        }
367
368        Ok(agent)
369    }
370
371    async fn start(&mut self) -> Result<(), AgentError> {
372        if self.cron_schedule.is_some() {
373            self.start_timer()?;
374        }
375        Ok(())
376    }
377
378    async fn stop(&mut self) -> Result<(), AgentError> {
379        self.stop_timer()
380    }
381
382    fn configs_changed(&mut self) -> Result<(), AgentError> {
383        // Check if schedule has changed
384        let schedule_str = self.configs()?.get_string(CONFIG_SCHEDULE)?;
385        self.parse_schedule(&schedule_str)?;
386
387        if *self.status() == AgentStatus::Start {
388            // Restart the timer with the new schedule
389            self.stop_timer()?;
390            if self.cron_schedule.is_some() {
391                self.start_timer()?;
392            }
393        }
394        Ok(())
395    }
396}
397
398// Throttle agent
399#[askit_agent(
400    title = "Throttle Time",
401    category = CATEGORY,
402    inputs = ["*"],
403    outputs = ["*"],
404    string_config(name = CONFIG_TIME, default = TIME_DEFAULT, description = "(ex. 10s, 5m, 100ms, 1h, 1d)"),
405    integer_config(name = CONFIG_MAX_NUM_DATA, title = "max num data", description = "0: no data, -1: all data")
406)]
407struct ThrottleTimeAgent {
408    data: AgentData,
409    timer_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
410    time_ms: u64,
411    max_num_data: i64,
412    waiting_data: Arc<Mutex<Vec<(AgentContext, String, AgentValue)>>>,
413}
414
415impl ThrottleTimeAgent {
416    fn start_timer(&mut self) -> Result<(), AgentError> {
417        let timer_handle = self.timer_handle.clone();
418        let time_ms = self.time_ms;
419
420        let waiting_data = self.waiting_data.clone();
421        let askit = self.askit().clone();
422        let agent_id = self.id().to_string();
423
424        let handle = self.runtime().spawn(async move {
425            loop {
426                // Sleep for the configured interval
427                tokio::time::sleep(tokio::time::Duration::from_millis(time_ms)).await;
428
429                // Check if we've been stopped
430                let mut handle = timer_handle.lock().unwrap();
431                if handle.is_none() {
432                    break;
433                }
434
435                // process the waiting data
436                let mut wd = waiting_data.lock().unwrap();
437                if wd.len() > 0 {
438                    // If there are data waiting, output the first one
439                    let (ctx, pin, data) = wd.remove(0);
440                    askit
441                        .try_send_agent_out(agent_id.clone(), ctx, pin, data)
442                        .unwrap_or_else(|e| {
443                            log::error!("Failed to send delayed output: {}", e);
444                        });
445                }
446
447                // If there are no data waiting, we stop the timer
448                if wd.len() == 0 {
449                    handle.take();
450                    break;
451                }
452            }
453        });
454
455        // Store the timer handle
456        if let Ok(mut timer_handle) = self.timer_handle.lock() {
457            *timer_handle = Some(handle);
458        }
459
460        Ok(())
461    }
462
463    fn stop_timer(&mut self) -> Result<(), AgentError> {
464        // Cancel the timer
465        if let Ok(mut timer_handle) = self.timer_handle.lock() {
466            if let Some(handle) = timer_handle.take() {
467                handle.abort();
468            }
469        }
470        Ok(())
471    }
472}
473
474#[async_trait]
475impl AsAgent for ThrottleTimeAgent {
476    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
477        let time = spec
478            .configs
479            .as_ref()
480            .ok_or(AgentError::NoConfig)?
481            .get_string_or(CONFIG_TIME, TIME_DEFAULT);
482        let time_ms = parse_duration_to_ms(&time)?;
483
484        let max_num_data = spec
485            .configs
486            .as_ref()
487            .ok_or(AgentError::NoConfig)?
488            .get_integer_or(CONFIG_MAX_NUM_DATA, 0);
489
490        Ok(Self {
491            data: AgentData::new(askit, id, spec),
492            timer_handle: Default::default(),
493            time_ms,
494            max_num_data,
495            waiting_data: Arc::new(Mutex::new(vec![])),
496        })
497    }
498
499    async fn stop(&mut self) -> Result<(), AgentError> {
500        self.stop_timer()
501    }
502
503    fn configs_changed(&mut self) -> Result<(), AgentError> {
504        // Check if interval has changed
505        let time = self.configs()?.get_string(CONFIG_TIME)?;
506        let new_time = parse_duration_to_ms(&time)?;
507        if new_time != self.time_ms {
508            self.time_ms = new_time;
509        }
510
511        // Check if max_num_data has changed
512        let max_num_data = self.configs()?.get_integer(CONFIG_MAX_NUM_DATA)?;
513        if self.max_num_data != max_num_data {
514            let mut wd = self.waiting_data.lock().unwrap();
515            let wd_len = wd.len();
516            if max_num_data >= 0 && wd_len > (max_num_data as usize) {
517                // If we have reached the max data to keep, we drop the oldest one
518                wd.drain(0..(wd_len - (max_num_data as usize)));
519            }
520            self.max_num_data = max_num_data;
521        }
522        Ok(())
523    }
524
525    async fn process(
526        &mut self,
527        ctx: AgentContext,
528        pin: String,
529        value: AgentValue,
530    ) -> Result<(), AgentError> {
531        if self.timer_handle.lock().unwrap().is_some() {
532            // If the timer is running, we just add the data to the waiting list
533            let mut wd = self.waiting_data.lock().unwrap();
534
535            // If max_num_data is 0, we don't need to keep any data
536            if self.max_num_data == 0 {
537                return Ok(());
538            }
539
540            wd.push((ctx, pin, value));
541            if self.max_num_data > 0 && wd.len() > self.max_num_data as usize {
542                // If we have reached the max data to keep, we drop the oldest one
543                wd.remove(0);
544            }
545
546            return Ok(());
547        }
548
549        // Start the timer
550        self.start_timer()?;
551
552        // Output the data
553        self.try_output(ctx, pin, value)?;
554
555        Ok(())
556    }
557}
558
559// Parse time duration strings like "2s", "10m", "200ms"
560fn parse_duration_to_ms(duration_str: &str) -> Result<u64, AgentError> {
561    const MIN_DURATION: u64 = 10;
562
563    // Regular expression to match number followed by optional unit
564    let re = Regex::new(r"^(\d+)(?:([a-zA-Z]+))?$").expect("Failed to compile regex");
565
566    if let Some(captures) = re.captures(duration_str.trim()) {
567        let value: u64 = captures.get(1).unwrap().as_str().parse().map_err(|e| {
568            AgentError::InvalidConfig(format!(
569                "Invalid number in duration '{}': {}",
570                duration_str, e
571            ))
572        })?;
573
574        // Get the unit if present, default to "s" (seconds)
575        let unit = captures
576            .get(2)
577            .map_or("s".to_string(), |m| m.as_str().to_lowercase());
578
579        // Convert to milliseconds based on unit
580        let milliseconds = match unit.as_str() {
581            "ms" => value,               // already in milliseconds
582            "s" => value * 1000,         // seconds to milliseconds
583            "m" => value * 60 * 1000,    // minutes to milliseconds
584            "h" => value * 3600 * 1000,  // hours to milliseconds
585            "d" => value * 86400 * 1000, // days to milliseconds
586            _ => {
587                return Err(AgentError::InvalidConfig(format!(
588                    "Unknown time unit: {}",
589                    unit
590                )));
591            }
592        };
593
594        // Ensure we don't return less than the minimum duration
595        Ok(std::cmp::max(milliseconds, MIN_DURATION))
596    } else {
597        // If the string doesn't match the pattern, try to parse it as a plain number
598        // and assume it's in seconds
599        let value: u64 = duration_str.parse().map_err(|e| {
600            AgentError::InvalidConfig(format!("Invalid duration format '{}': {}", duration_str, e))
601        })?;
602        Ok(std::cmp::max(value * 1000, MIN_DURATION)) // Convert to ms
603    }
604}