Skip to main content

sim_lib_server/
trigger.rs

1use std::{
2    sync::{
3        Arc, Mutex,
4        atomic::{AtomicBool, AtomicU64},
5        mpsc::Receiver,
6    },
7    thread::JoinHandle,
8    time::Duration,
9};
10
11use sim_citizen_derive::non_citizen;
12use sim_kernel::{
13    ClassRef, Cx, Error, Expr, Object, ReadPolicy, Result, Symbol, TrustLevel, Value,
14    read_construct_capability, read_eval_capability,
15};
16
17use crate::{Server, ServerAddress, cron::CronMatcher, ensure_installed_codec};
18
19mod runtime;
20mod source_options;
21mod sources;
22
23use source_options::source_loopback;
24use sources::{TriggerSource, build_network_source, source_capability};
25#[cfg(test)]
26use sources::{loopback_smtp_messages, queued_trigger_events};
27
28static NEXT_TRIGGER_ID: AtomicU64 = AtomicU64::new(1);
29
30#[derive(Clone)]
31enum TriggerDecoder {
32    Codec(Symbol),
33    Callable(Value),
34}
35
36struct TriggerConfig {
37    source: ServerAddress,
38    source_expr: Expr,
39    role: Option<Symbol>,
40    codec: Symbol,
41    decode_expr: Expr,
42    decoder: TriggerDecoder,
43    cron: Option<CronMatcher>,
44    network_source: Option<Box<dyn TriggerSource>>,
45}
46
47#[derive(Default)]
48struct TriggerState {
49    file_offset: usize,
50    file_remainder: Vec<u8>,
51    delivered: u64,
52    source_closed: bool,
53    last_cron_minute: Option<u64>,
54}
55
56enum StdinSource {
57    Channel(Mutex<Receiver<Option<Vec<u8>>>>),
58    Unavailable,
59}
60
61#[non_citizen(
62    reason = "live trigger handle; reconstruct source through server/Address descriptor and trigger ops",
63    kind = "handle"
64)]
65/// Live handle to a running trigger that delivers source events into a server.
66pub struct TriggerHandle {
67    id: u64,
68    server: Arc<Server>,
69    source: ServerAddress,
70    source_expr: Expr,
71    role: Option<Symbol>,
72    codec: Symbol,
73    decode_expr: Expr,
74    decoder: TriggerDecoder,
75    cron: Option<CronMatcher>,
76    network_source: Option<Mutex<Box<dyn TriggerSource>>>,
77    stopping: AtomicBool,
78    handle: Mutex<Option<JoinHandle<()>>>,
79    stdin: StdinSource,
80    state: Mutex<TriggerState>,
81}
82
83#[cfg(test)]
84#[allow(dead_code)]
85pub(crate) fn enqueue_trigger_event(source: &ServerAddress, payload: Vec<u8>) -> Result<()> {
86    let key = match source {
87        ServerAddress::Webhook { route } => Some(format!("webhook:{route}")),
88        ServerAddress::Imap { address, mailbox } => Some(format!("imap:{address}:{mailbox}")),
89        ServerAddress::Smtp { address } => Some(format!("smtp:{address}")),
90        ServerAddress::Telegram { chat_id, bot } => Some(format!("telegram:{chat_id}:{bot}")),
91        ServerAddress::Matrix { room_id } => Some(format!("matrix:{room_id}")),
92        _ => None,
93    }
94    .ok_or_else(|| Error::Eval("source does not use the queued trigger runtime".to_owned()))?;
95    let mut queues = queued_trigger_events()
96        .lock()
97        .map_err(|_| Error::PoisonedLock("trigger queue"))?;
98    queues.entry(key).or_default().push(payload);
99    Ok(())
100}
101
102#[cfg(test)]
103pub(crate) fn loopback_smtp_messages_for(source: &ServerAddress) -> Result<Vec<Vec<u8>>> {
104    let ServerAddress::Smtp { address } = source else {
105        return Err(Error::Eval(
106            "source does not use the loopback smtp runtime".to_owned(),
107        ));
108    };
109    let key = format!("smtp:{address}");
110    let messages = loopback_smtp_messages()
111        .lock()
112        .map_err(|_| Error::PoisonedLock("loopback smtp messages"))?;
113    Ok(messages.get(&key).cloned().unwrap_or_default())
114}
115
116impl Object for TriggerHandle {
117    fn display(&self, _cx: &mut Cx) -> Result<String> {
118        Ok("#<server-trigger>".to_owned())
119    }
120
121    fn as_any(&self) -> &dyn std::any::Any {
122        self
123    }
124}
125
126impl sim_kernel::ObjectCompat for TriggerHandle {
127    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
128        cx.factory().class_stub(
129            sim_kernel::ClassId(0),
130            Symbol::qualified("server", "Trigger"),
131        )
132    }
133    fn as_expr(&self, cx: &mut Cx) -> Result<Expr> {
134        self.reflect_value(cx)?.object().as_expr(cx)
135    }
136    fn as_table(&self, cx: &mut Cx) -> Result<Value> {
137        self.reflect_value(cx)
138    }
139}
140
141pub(crate) fn register_trigger(
142    cx: &mut Cx,
143    server: Arc<Server>,
144    source_expr: Expr,
145    decode_expr: Expr,
146    role: Option<Symbol>,
147    codec: Symbol,
148) -> Result<Arc<TriggerHandle>> {
149    let source_expr = literal_expr(&source_expr).clone();
150    let source = ServerAddress::from_expr(&source_expr)?;
151    if let Some(capability) = source_capability(&source) {
152        cx.require(&capability)?;
153    }
154    let cron = match &source {
155        ServerAddress::Cron { spec } => Some(CronMatcher::parse(spec)?),
156        _ => None,
157    };
158    let network_source = build_network_source(&source, &source_expr)?;
159
160    ensure_installed_codec(cx, &codec)?;
161    let decoder = build_decoder(cx, &decode_expr)?;
162    let trigger = Arc::new(TriggerHandle::new(
163        server.clone(),
164        TriggerConfig {
165            source,
166            source_expr,
167            role,
168            codec,
169            decode_expr,
170            decoder,
171            cron,
172            network_source,
173        },
174    ));
175    server.register_trigger(trigger.clone())?;
176    trigger.start(cx)?;
177    Ok(trigger)
178}
179
180fn build_decoder(cx: &mut Cx, decode_expr: &Expr) -> Result<TriggerDecoder> {
181    let expr = literal_expr(decode_expr).clone();
182    if let Expr::Symbol(symbol) = &expr
183        && let Some(codec) = normalize_codec_symbol(cx, symbol)
184    {
185        ensure_installed_codec(cx, &codec)?;
186        return Ok(TriggerDecoder::Codec(codec));
187    }
188    let callable = cx.eval_expr(expr)?;
189    if callable.object().as_callable().is_none() {
190        return Err(Error::TypeMismatch {
191            expected: "callable or codec symbol",
192            found: "non-callable",
193        });
194    }
195    Ok(TriggerDecoder::Callable(callable))
196}
197
198fn normalize_codec_symbol(cx: &Cx, symbol: &Symbol) -> Option<Symbol> {
199    if cx.registry().codec_by_symbol(symbol).is_some() {
200        return Some(symbol.clone());
201    }
202    let qualified = Symbol::qualified("codec", symbol.name.to_string());
203    if cx.registry().codec_by_symbol(&qualified).is_some() {
204        return Some(qualified);
205    }
206    None
207}
208
209fn literal_expr(expr: &Expr) -> &Expr {
210    match expr {
211        Expr::Quote { expr, .. } => expr,
212        _ => expr,
213    }
214}
215
216fn trigger_read_policy(cx: &Cx) -> ReadPolicy {
217    let mut capabilities = sim_kernel::CapabilitySet::new();
218    if cx.require(&read_construct_capability()).is_ok() {
219        capabilities.insert(read_construct_capability());
220    }
221    if cx.require(&read_eval_capability()).is_ok() {
222        capabilities.insert(read_eval_capability());
223    }
224    ReadPolicy {
225        trust: TrustLevel::TrustedSource,
226        capabilities,
227    }
228}
229
230fn io_error_to_host(err: std::io::Error) -> Error {
231    Error::HostError(err.to_string())
232}
233
234fn delivery_timeout() -> Duration {
235    Duration::from_millis(250)
236}
237
238fn source_loopback_enabled(expr: &Expr) -> bool {
239    source_loopback(expr).unwrap_or(false)
240}