Skip to main content

sim_lib_server/trigger/
runtime.rs

1use std::{
2    fs,
3    path::PathBuf,
4    sync::{
5        Arc, Mutex,
6        atomic::{AtomicBool, Ordering},
7        mpsc,
8    },
9    thread,
10    time::{Duration, SystemTime, UNIX_EPOCH},
11};
12
13use sim_kernel::{Args, Cx, Error, Expr, Result, Symbol, Value};
14
15use crate::{
16    Server, ServerAddress, ServerFrame, decode_frame_payload, helpers::clone_server_cx,
17    symbol_list_value,
18};
19
20#[cfg(not(test))]
21use super::sources::spawn_stdin_reader;
22#[cfg(test)]
23use super::sources::{remove_test_stdin_sender, test_stdin_sender, test_stdin_senders};
24use super::{
25    NEXT_TRIGGER_ID, StdinSource, TriggerConfig, TriggerDecoder, TriggerHandle, TriggerState,
26    delivery_timeout, io_error_to_host, source_capability, trigger_read_policy,
27};
28
29impl TriggerHandle {
30    pub(super) fn new(server: Arc<Server>, config: TriggerConfig) -> Self {
31        let TriggerConfig {
32            source,
33            source_expr,
34            role,
35            codec,
36            decode_expr,
37            decoder,
38            cron,
39            network_source,
40        } = config;
41        let id = NEXT_TRIGGER_ID.fetch_add(1, Ordering::Relaxed);
42        let stdin = match &source {
43            ServerAddress::Stdin => {
44                let (tx, rx) = mpsc::channel();
45                #[cfg(not(test))]
46                spawn_stdin_reader(tx);
47                #[cfg(test)]
48                {
49                    test_stdin_senders()
50                        .lock()
51                        .expect("test stdin sender registry poisoned")
52                        .insert(id, tx);
53                }
54                StdinSource::Channel(Mutex::new(rx))
55            }
56            _ => StdinSource::Unavailable,
57        };
58        Self {
59            id,
60            server,
61            source,
62            source_expr,
63            role,
64            codec,
65            decode_expr,
66            decoder,
67            cron,
68            network_source: network_source.map(Mutex::new),
69            stopping: AtomicBool::new(false),
70            handle: Mutex::new(None),
71            stdin,
72            state: Mutex::new(TriggerState::default()),
73        }
74    }
75
76    pub(crate) fn start(self: &Arc<Self>, seed: &Cx) -> Result<()> {
77        let mut handle = self
78            .handle
79            .lock()
80            .map_err(|_| Error::PoisonedLock("trigger handle"))?;
81        if handle.is_some() {
82            return Ok(());
83        }
84        let mut cx = clone_server_cx(seed);
85        let trigger = Arc::clone(self);
86        *handle = Some(thread::spawn(move || {
87            trigger.run(&mut cx);
88        }));
89        Ok(())
90    }
91
92    pub(crate) fn stop(&self) -> Result<()> {
93        self.stopping.store(true, Ordering::Relaxed);
94        #[cfg(test)]
95        remove_test_stdin_sender(self.id);
96        let join = self
97            .handle
98            .lock()
99            .map_err(|_| Error::PoisonedLock("trigger handle"))?
100            .take();
101        if let Some(join) = join {
102            join.join()
103                .map_err(|_| Error::HostError("trigger thread panicked".to_owned()))?;
104        }
105        Ok(())
106    }
107
108    pub(crate) fn poll(&self, cx: &mut Cx) -> Result<u64> {
109        match &self.source {
110            ServerAddress::Stdin => self.poll_stdin(cx),
111            ServerAddress::FileTail { path } => self.poll_file_tail(cx, path),
112            ServerAddress::Cron { .. } => self.poll_cron(cx),
113            ServerAddress::Webhook { .. }
114            | ServerAddress::Imap { .. }
115            | ServerAddress::Smtp { .. }
116            | ServerAddress::Telegram { .. }
117            | ServerAddress::Matrix { .. } => self.poll_network_source(cx),
118            other => Err(Error::Eval(format!(
119                "server/trigger does not support source {}",
120                other.kind_symbol()
121            ))),
122        }
123    }
124
125    #[cfg(test)]
126    pub(crate) fn inject_text(&self, cx: &mut Cx, text: &str) -> Result<u64> {
127        let mut delivered = 0;
128        for line in text.lines() {
129            if line.trim().is_empty() {
130                continue;
131            }
132            self.inject_event(cx, line.as_bytes())?;
133            delivered += 1;
134        }
135        Ok(delivered)
136    }
137
138    #[cfg(test)]
139    pub(crate) fn feed_stdin(&self, text: &str) -> Result<()> {
140        let sender = test_stdin_sender(self.id)?;
141        for line in text.lines() {
142            sender
143                .send(Some(format!("{line}\n").into_bytes()))
144                .map_err(|_| Error::HostError("stdin trigger source closed".to_owned()))?;
145        }
146        Ok(())
147    }
148
149    #[cfg(test)]
150    pub(crate) fn finish_stdin(&self) -> Result<()> {
151        test_stdin_sender(self.id)?
152            .send(None)
153            .map_err(|_| Error::HostError("stdin trigger source closed".to_owned()))
154    }
155
156    #[cfg(test)]
157    pub(crate) fn is_source_closed(&self) -> bool {
158        self.state
159            .lock()
160            .map(|state| state.source_closed)
161            .unwrap_or(true)
162    }
163
164    #[cfg(all(test, feature = "trigger-webhook"))]
165    pub(crate) fn webhook_port(&self) -> Result<Option<u16>> {
166        let Some(source) = &self.network_source else {
167            return Ok(None);
168        };
169        let source = source
170            .lock()
171            .map_err(|_| Error::PoisonedLock("trigger source"))?;
172        let Some(source) = source
173            .as_any()
174            .downcast_ref::<super::sources::network::WebhookSource>()
175        else {
176            return Ok(None);
177        };
178        source.local_port().map(Some)
179    }
180
181    /// Reflects the trigger's state as a descriptor table value.
182    ///
183    /// Includes its kind, role, delivered-event count, and required
184    /// capabilities.
185    pub fn reflect_value(&self, cx: &mut Cx) -> Result<Value> {
186        let role = match &self.role {
187            Some(role) => cx.factory().symbol(role.clone())?,
188            None => cx.factory().nil()?,
189        };
190        let delivered = self
191            .state
192            .lock()
193            .map_err(|_| Error::PoisonedLock("trigger state"))?
194            .delivered;
195        let capabilities = match source_capability(&self.source) {
196            Some(capability) => symbol_list_value(cx, &[capability.as_symbol()])?,
197            None => cx.factory().list(Vec::new())?,
198        };
199        cx.factory().table(vec![
200            (
201                Symbol::new("kind"),
202                cx.factory().symbol(Symbol::new("trigger"))?,
203            ),
204            (Symbol::new("id"), cx.factory().string(self.id.to_string())?),
205            (
206                Symbol::new("source"),
207                cx.factory().expr(self.source_expr.clone())?,
208            ),
209            (Symbol::new("role"), role),
210            (
211                Symbol::new("codec"),
212                cx.factory().symbol(self.codec.clone())?,
213            ),
214            (
215                Symbol::new("decode"),
216                cx.factory().expr(self.decode_expr.clone())?,
217            ),
218            (
219                Symbol::new("delivered"),
220                cx.factory().string(delivered.to_string())?,
221            ),
222            (Symbol::new("requires"), capabilities),
223        ])
224    }
225
226    fn poll_file_tail(&self, cx: &mut Cx, path: &PathBuf) -> Result<u64> {
227        let mut bytes = fs::read(path).map_err(io_error_to_host)?;
228        let lines = {
229            let mut state = self
230                .state
231                .lock()
232                .map_err(|_| Error::PoisonedLock("trigger state"))?;
233            if state.file_offset > bytes.len() {
234                state.file_offset = 0;
235                state.file_remainder.clear();
236            }
237            let mut pending = std::mem::take(&mut state.file_remainder);
238            pending.extend_from_slice(&bytes[state.file_offset..]);
239            state.file_offset = bytes.len();
240
241            let mut lines = Vec::new();
242            let mut start = 0usize;
243            for index in 0..pending.len() {
244                if pending[index] == b'\n' {
245                    let mut line = pending[start..index].to_vec();
246                    if line.last() == Some(&b'\r') {
247                        line.pop();
248                    }
249                    lines.push(line);
250                    start = index + 1;
251                }
252            }
253            state.file_remainder = pending[start..].to_vec();
254            lines
255        };
256        bytes.clear();
257
258        let mut delivered = 0;
259        for line in lines {
260            if line.iter().all(u8::is_ascii_whitespace) {
261                continue;
262            }
263            self.inject_event(cx, &line)?;
264            delivered += 1;
265        }
266        Ok(delivered)
267    }
268
269    fn poll_stdin(&self, cx: &mut Cx) -> Result<u64> {
270        let receiver = match &self.stdin {
271            StdinSource::Channel(receiver) => receiver,
272            StdinSource::Unavailable => return Ok(0),
273        };
274        let mut delivered = 0;
275        loop {
276            match receiver
277                .lock()
278                .map_err(|_| Error::PoisonedLock("stdin trigger source"))?
279                .try_recv()
280            {
281                Ok(Some(mut line)) => {
282                    if line.last() == Some(&b'\n') {
283                        line.pop();
284                    }
285                    if line.last() == Some(&b'\r') {
286                        line.pop();
287                    }
288                    if line.iter().all(u8::is_ascii_whitespace) {
289                        continue;
290                    }
291                    self.inject_event(cx, &line)?;
292                    delivered += 1;
293                }
294                Ok(None) | Err(mpsc::TryRecvError::Disconnected) => {
295                    self.state
296                        .lock()
297                        .map_err(|_| Error::PoisonedLock("trigger state"))?
298                        .source_closed = true;
299                    break;
300                }
301                Err(mpsc::TryRecvError::Empty) => break,
302            }
303        }
304        Ok(delivered)
305    }
306
307    fn poll_network_source(&self, cx: &mut Cx) -> Result<u64> {
308        let Some(source) = &self.network_source else {
309            return Ok(0);
310        };
311        let mut delivered = 0;
312        loop {
313            let timeout = if delivered == 0 {
314                delivery_timeout()
315            } else {
316                Duration::from_millis(0)
317            };
318            let event = source
319                .lock()
320                .map_err(|_| Error::PoisonedLock("trigger source"))?
321                .next_event(cx, timeout)?;
322            let Some(event) = event else {
323                break;
324            };
325            self.inject_event(cx, &event)?;
326            source
327                .lock()
328                .map_err(|_| Error::PoisonedLock("trigger source"))?
329                .ack(cx)?;
330            delivered += 1;
331        }
332        Ok(delivered)
333    }
334
335    fn poll_cron(&self, cx: &mut Cx) -> Result<u64> {
336        let Some(matcher) = &self.cron else {
337            return Ok(0);
338        };
339        let Some(minute_key) = matcher.current_match(SystemTime::now()) else {
340            return Ok(0);
341        };
342        {
343            let mut state = self
344                .state
345                .lock()
346                .map_err(|_| Error::PoisonedLock("trigger state"))?;
347            if state.last_cron_minute == Some(minute_key) {
348                return Ok(0);
349            }
350            state.last_cron_minute = Some(minute_key);
351        }
352        self.inject_event(cx, b"tick")?;
353        Ok(1)
354    }
355
356    fn inject_event(&self, cx: &mut Cx, raw: &[u8]) -> Result<()> {
357        if let Some(capability) = source_capability(&self.source) {
358            cx.require(&capability)?;
359        }
360        let expr = self.decode_event(cx, raw)?;
361        let source = self.source.kind_symbol();
362        let when_ms = SystemTime::now()
363            .duration_since(UNIX_EPOCH)
364            .map(|duration| duration.as_millis() as u64)
365            .unwrap_or(0);
366        let mut frame = ServerFrame::from_expr(
367            cx,
368            self.codec.clone(),
369            crate::FrameKind::Trigger {
370                source: source.clone(),
371                when_ms,
372            },
373            &expr,
374            sim_kernel::Consistency::LocalFirst,
375            Vec::new(),
376            false,
377        )?;
378        frame.envelope.role = self.role.clone();
379        frame.envelope.trigger_source = Some(source);
380        self.server.deliver_trigger_frame(cx, frame)?;
381        self.state
382            .lock()
383            .map_err(|_| Error::PoisonedLock("trigger state"))?
384            .delivered += 1;
385        Ok(())
386    }
387
388    fn decode_event(&self, cx: &mut Cx, raw: &[u8]) -> Result<Expr> {
389        match &self.decoder {
390            TriggerDecoder::Codec(codec) => {
391                decode_frame_payload(cx, codec, raw, trigger_read_policy(cx), Default::default())
392            }
393            TriggerDecoder::Callable(callable) => {
394                let text = String::from_utf8(raw.to_vec()).map_err(|_| {
395                    Error::Eval(
396                        "trigger event bytes must be valid utf-8 for callable decoders".to_owned(),
397                    )
398                })?;
399                let arg = cx.factory().string(text)?;
400                let value = cx.call_value(callable.clone(), Args::new(vec![arg]))?;
401                value.object().as_expr(cx)
402            }
403        }
404    }
405
406    fn run(self: Arc<Self>, cx: &mut Cx) {
407        while !self.stopping.load(Ordering::Relaxed) {
408            match self.poll(cx) {
409                Ok(_) => {}
410                Err(_) => break,
411            }
412            let source_closed = self
413                .state
414                .lock()
415                .map(|state| state.source_closed)
416                .unwrap_or(true);
417            if source_closed {
418                break;
419            }
420            thread::sleep(self.poll_interval());
421        }
422    }
423
424    fn poll_interval(&self) -> Duration {
425        match self.source {
426            ServerAddress::Cron { .. } => Duration::from_millis(250),
427            _ => Duration::from_millis(25),
428        }
429    }
430}