1use std::{
2 fs,
3 path::PathBuf,
4 sync::{
5 Arc, Mutex,
6 atomic::{AtomicBool, Ordering},
7 mpsc,
8 },
9 thread,
10 time::{Duration, SystemTime, UNIX_EPOCH},
11};
12
13use sim_kernel::{Args, Cx, Error, Expr, Result, Symbol, Value};
14
15use crate::{
16 Server, ServerAddress, ServerFrame, decode_frame_payload, helpers::clone_server_cx,
17 symbol_list_value,
18};
19
20#[cfg(not(test))]
21use super::sources::spawn_stdin_reader;
22#[cfg(test)]
23use super::sources::{remove_test_stdin_sender, test_stdin_sender, test_stdin_senders};
24use super::{
25 NEXT_TRIGGER_ID, StdinSource, TriggerConfig, TriggerDecoder, TriggerHandle, TriggerState,
26 delivery_timeout, io_error_to_host, source_capability, trigger_read_policy,
27};
28
29impl TriggerHandle {
30 pub(super) fn new(server: Arc<Server>, config: TriggerConfig) -> Self {
31 let TriggerConfig {
32 source,
33 source_expr,
34 role,
35 codec,
36 decode_expr,
37 decoder,
38 cron,
39 network_source,
40 } = config;
41 let id = NEXT_TRIGGER_ID.fetch_add(1, Ordering::Relaxed);
42 let stdin = match &source {
43 ServerAddress::Stdin => {
44 let (tx, rx) = mpsc::channel();
45 #[cfg(not(test))]
46 spawn_stdin_reader(tx);
47 #[cfg(test)]
48 {
49 test_stdin_senders()
50 .lock()
51 .expect("test stdin sender registry poisoned")
52 .insert(id, tx);
53 }
54 StdinSource::Channel(Mutex::new(rx))
55 }
56 _ => StdinSource::Unavailable,
57 };
58 Self {
59 id,
60 server,
61 source,
62 source_expr,
63 role,
64 codec,
65 decode_expr,
66 decoder,
67 cron,
68 network_source: network_source.map(Mutex::new),
69 stopping: AtomicBool::new(false),
70 handle: Mutex::new(None),
71 stdin,
72 state: Mutex::new(TriggerState::default()),
73 }
74 }
75
76 pub(crate) fn start(self: &Arc<Self>, seed: &Cx) -> Result<()> {
77 let mut handle = self
78 .handle
79 .lock()
80 .map_err(|_| Error::PoisonedLock("trigger handle"))?;
81 if handle.is_some() {
82 return Ok(());
83 }
84 let mut cx = clone_server_cx(seed);
85 let trigger = Arc::clone(self);
86 *handle = Some(thread::spawn(move || {
87 trigger.run(&mut cx);
88 }));
89 Ok(())
90 }
91
92 pub(crate) fn stop(&self) -> Result<()> {
93 self.stopping.store(true, Ordering::Relaxed);
94 #[cfg(test)]
95 remove_test_stdin_sender(self.id);
96 let join = self
97 .handle
98 .lock()
99 .map_err(|_| Error::PoisonedLock("trigger handle"))?
100 .take();
101 if let Some(join) = join {
102 join.join()
103 .map_err(|_| Error::HostError("trigger thread panicked".to_owned()))?;
104 }
105 Ok(())
106 }
107
108 pub(crate) fn poll(&self, cx: &mut Cx) -> Result<u64> {
109 match &self.source {
110 ServerAddress::Stdin => self.poll_stdin(cx),
111 ServerAddress::FileTail { path } => self.poll_file_tail(cx, path),
112 ServerAddress::Cron { .. } => self.poll_cron(cx),
113 ServerAddress::Webhook { .. }
114 | ServerAddress::Imap { .. }
115 | ServerAddress::Smtp { .. }
116 | ServerAddress::Telegram { .. }
117 | ServerAddress::Matrix { .. } => self.poll_network_source(cx),
118 other => Err(Error::Eval(format!(
119 "server/trigger does not support source {}",
120 other.kind_symbol()
121 ))),
122 }
123 }
124
125 #[cfg(test)]
126 pub(crate) fn inject_text(&self, cx: &mut Cx, text: &str) -> Result<u64> {
127 let mut delivered = 0;
128 for line in text.lines() {
129 if line.trim().is_empty() {
130 continue;
131 }
132 self.inject_event(cx, line.as_bytes())?;
133 delivered += 1;
134 }
135 Ok(delivered)
136 }
137
138 #[cfg(test)]
139 pub(crate) fn feed_stdin(&self, text: &str) -> Result<()> {
140 let sender = test_stdin_sender(self.id)?;
141 for line in text.lines() {
142 sender
143 .send(Some(format!("{line}\n").into_bytes()))
144 .map_err(|_| Error::HostError("stdin trigger source closed".to_owned()))?;
145 }
146 Ok(())
147 }
148
149 #[cfg(test)]
150 pub(crate) fn finish_stdin(&self) -> Result<()> {
151 test_stdin_sender(self.id)?
152 .send(None)
153 .map_err(|_| Error::HostError("stdin trigger source closed".to_owned()))
154 }
155
156 #[cfg(test)]
157 pub(crate) fn is_source_closed(&self) -> bool {
158 self.state
159 .lock()
160 .map(|state| state.source_closed)
161 .unwrap_or(true)
162 }
163
164 #[cfg(all(test, feature = "trigger-webhook"))]
165 pub(crate) fn webhook_port(&self) -> Result<Option<u16>> {
166 let Some(source) = &self.network_source else {
167 return Ok(None);
168 };
169 let source = source
170 .lock()
171 .map_err(|_| Error::PoisonedLock("trigger source"))?;
172 let Some(source) = source
173 .as_any()
174 .downcast_ref::<super::sources::network::WebhookSource>()
175 else {
176 return Ok(None);
177 };
178 source.local_port().map(Some)
179 }
180
181 pub fn reflect_value(&self, cx: &mut Cx) -> Result<Value> {
186 let role = match &self.role {
187 Some(role) => cx.factory().symbol(role.clone())?,
188 None => cx.factory().nil()?,
189 };
190 let delivered = self
191 .state
192 .lock()
193 .map_err(|_| Error::PoisonedLock("trigger state"))?
194 .delivered;
195 let capabilities = match source_capability(&self.source) {
196 Some(capability) => symbol_list_value(cx, &[capability.as_symbol()])?,
197 None => cx.factory().list(Vec::new())?,
198 };
199 cx.factory().table(vec![
200 (
201 Symbol::new("kind"),
202 cx.factory().symbol(Symbol::new("trigger"))?,
203 ),
204 (Symbol::new("id"), cx.factory().string(self.id.to_string())?),
205 (
206 Symbol::new("source"),
207 cx.factory().expr(self.source_expr.clone())?,
208 ),
209 (Symbol::new("role"), role),
210 (
211 Symbol::new("codec"),
212 cx.factory().symbol(self.codec.clone())?,
213 ),
214 (
215 Symbol::new("decode"),
216 cx.factory().expr(self.decode_expr.clone())?,
217 ),
218 (
219 Symbol::new("delivered"),
220 cx.factory().string(delivered.to_string())?,
221 ),
222 (Symbol::new("requires"), capabilities),
223 ])
224 }
225
226 fn poll_file_tail(&self, cx: &mut Cx, path: &PathBuf) -> Result<u64> {
227 let mut bytes = fs::read(path).map_err(io_error_to_host)?;
228 let lines = {
229 let mut state = self
230 .state
231 .lock()
232 .map_err(|_| Error::PoisonedLock("trigger state"))?;
233 if state.file_offset > bytes.len() {
234 state.file_offset = 0;
235 state.file_remainder.clear();
236 }
237 let mut pending = std::mem::take(&mut state.file_remainder);
238 pending.extend_from_slice(&bytes[state.file_offset..]);
239 state.file_offset = bytes.len();
240
241 let mut lines = Vec::new();
242 let mut start = 0usize;
243 for index in 0..pending.len() {
244 if pending[index] == b'\n' {
245 let mut line = pending[start..index].to_vec();
246 if line.last() == Some(&b'\r') {
247 line.pop();
248 }
249 lines.push(line);
250 start = index + 1;
251 }
252 }
253 state.file_remainder = pending[start..].to_vec();
254 lines
255 };
256 bytes.clear();
257
258 let mut delivered = 0;
259 for line in lines {
260 if line.iter().all(u8::is_ascii_whitespace) {
261 continue;
262 }
263 self.inject_event(cx, &line)?;
264 delivered += 1;
265 }
266 Ok(delivered)
267 }
268
269 fn poll_stdin(&self, cx: &mut Cx) -> Result<u64> {
270 let receiver = match &self.stdin {
271 StdinSource::Channel(receiver) => receiver,
272 StdinSource::Unavailable => return Ok(0),
273 };
274 let mut delivered = 0;
275 loop {
276 match receiver
277 .lock()
278 .map_err(|_| Error::PoisonedLock("stdin trigger source"))?
279 .try_recv()
280 {
281 Ok(Some(mut line)) => {
282 if line.last() == Some(&b'\n') {
283 line.pop();
284 }
285 if line.last() == Some(&b'\r') {
286 line.pop();
287 }
288 if line.iter().all(u8::is_ascii_whitespace) {
289 continue;
290 }
291 self.inject_event(cx, &line)?;
292 delivered += 1;
293 }
294 Ok(None) | Err(mpsc::TryRecvError::Disconnected) => {
295 self.state
296 .lock()
297 .map_err(|_| Error::PoisonedLock("trigger state"))?
298 .source_closed = true;
299 break;
300 }
301 Err(mpsc::TryRecvError::Empty) => break,
302 }
303 }
304 Ok(delivered)
305 }
306
307 fn poll_network_source(&self, cx: &mut Cx) -> Result<u64> {
308 let Some(source) = &self.network_source else {
309 return Ok(0);
310 };
311 let mut delivered = 0;
312 loop {
313 let timeout = if delivered == 0 {
314 delivery_timeout()
315 } else {
316 Duration::from_millis(0)
317 };
318 let event = source
319 .lock()
320 .map_err(|_| Error::PoisonedLock("trigger source"))?
321 .next_event(cx, timeout)?;
322 let Some(event) = event else {
323 break;
324 };
325 self.inject_event(cx, &event)?;
326 source
327 .lock()
328 .map_err(|_| Error::PoisonedLock("trigger source"))?
329 .ack(cx)?;
330 delivered += 1;
331 }
332 Ok(delivered)
333 }
334
335 fn poll_cron(&self, cx: &mut Cx) -> Result<u64> {
336 let Some(matcher) = &self.cron else {
337 return Ok(0);
338 };
339 let Some(minute_key) = matcher.current_match(SystemTime::now()) else {
340 return Ok(0);
341 };
342 {
343 let mut state = self
344 .state
345 .lock()
346 .map_err(|_| Error::PoisonedLock("trigger state"))?;
347 if state.last_cron_minute == Some(minute_key) {
348 return Ok(0);
349 }
350 state.last_cron_minute = Some(minute_key);
351 }
352 self.inject_event(cx, b"tick")?;
353 Ok(1)
354 }
355
356 fn inject_event(&self, cx: &mut Cx, raw: &[u8]) -> Result<()> {
357 if let Some(capability) = source_capability(&self.source) {
358 cx.require(&capability)?;
359 }
360 let expr = self.decode_event(cx, raw)?;
361 let source = self.source.kind_symbol();
362 let when_ms = SystemTime::now()
363 .duration_since(UNIX_EPOCH)
364 .map(|duration| duration.as_millis() as u64)
365 .unwrap_or(0);
366 let mut frame = ServerFrame::from_expr(
367 cx,
368 self.codec.clone(),
369 crate::FrameKind::Trigger {
370 source: source.clone(),
371 when_ms,
372 },
373 &expr,
374 sim_kernel::Consistency::LocalFirst,
375 Vec::new(),
376 false,
377 )?;
378 frame.envelope.role = self.role.clone();
379 frame.envelope.trigger_source = Some(source);
380 self.server.deliver_trigger_frame(cx, frame)?;
381 self.state
382 .lock()
383 .map_err(|_| Error::PoisonedLock("trigger state"))?
384 .delivered += 1;
385 Ok(())
386 }
387
388 fn decode_event(&self, cx: &mut Cx, raw: &[u8]) -> Result<Expr> {
389 match &self.decoder {
390 TriggerDecoder::Codec(codec) => {
391 decode_frame_payload(cx, codec, raw, trigger_read_policy(cx), Default::default())
392 }
393 TriggerDecoder::Callable(callable) => {
394 let text = String::from_utf8(raw.to_vec()).map_err(|_| {
395 Error::Eval(
396 "trigger event bytes must be valid utf-8 for callable decoders".to_owned(),
397 )
398 })?;
399 let arg = cx.factory().string(text)?;
400 let value = cx.call_value(callable.clone(), Args::new(vec![arg]))?;
401 value.object().as_expr(cx)
402 }
403 }
404 }
405
406 fn run(self: Arc<Self>, cx: &mut Cx) {
407 while !self.stopping.load(Ordering::Relaxed) {
408 match self.poll(cx) {
409 Ok(_) => {}
410 Err(_) => break,
411 }
412 let source_closed = self
413 .state
414 .lock()
415 .map(|state| state.source_closed)
416 .unwrap_or(true);
417 if source_closed {
418 break;
419 }
420 thread::sleep(self.poll_interval());
421 }
422 }
423
424 fn poll_interval(&self) -> Duration {
425 match self.source {
426 ServerAddress::Cron { .. } => Duration::from_millis(250),
427 _ => Duration::from_millis(25),
428 }
429 }
430}