1use std::{
2 sync::{
3 Arc, Mutex,
4 atomic::{AtomicBool, AtomicU64},
5 mpsc::Receiver,
6 },
7 thread::JoinHandle,
8 time::Duration,
9};
10
11use sim_citizen_derive::non_citizen;
12use sim_kernel::{
13 ClassRef, Cx, Error, Expr, Object, ReadPolicy, Result, Symbol, TrustLevel, Value,
14 read_construct_capability, read_eval_capability,
15};
16
17use crate::{Server, ServerAddress, cron::CronMatcher, ensure_installed_codec};
18
19mod runtime;
20mod source_options;
21mod sources;
22
23use source_options::source_loopback;
24use sources::{TriggerSource, build_network_source, source_capability};
25#[cfg(test)]
26use sources::{loopback_smtp_messages, queued_trigger_events};
27
28static NEXT_TRIGGER_ID: AtomicU64 = AtomicU64::new(1);
29
30#[derive(Clone)]
31enum TriggerDecoder {
32 Codec(Symbol),
33 Callable(Value),
34}
35
36struct TriggerConfig {
37 source: ServerAddress,
38 source_expr: Expr,
39 role: Option<Symbol>,
40 codec: Symbol,
41 decode_expr: Expr,
42 decoder: TriggerDecoder,
43 cron: Option<CronMatcher>,
44 network_source: Option<Box<dyn TriggerSource>>,
45}
46
47#[derive(Default)]
48struct TriggerState {
49 file_offset: usize,
50 file_remainder: Vec<u8>,
51 delivered: u64,
52 source_closed: bool,
53 last_cron_minute: Option<u64>,
54}
55
56enum StdinSource {
57 Channel(Mutex<Receiver<Option<Vec<u8>>>>),
58 Unavailable,
59}
60
61#[non_citizen(
62 reason = "live trigger handle; reconstruct source through server/Address descriptor and trigger ops",
63 kind = "handle"
64)]
65pub struct TriggerHandle {
67 id: u64,
68 server: Arc<Server>,
69 source: ServerAddress,
70 source_expr: Expr,
71 role: Option<Symbol>,
72 codec: Symbol,
73 decode_expr: Expr,
74 decoder: TriggerDecoder,
75 cron: Option<CronMatcher>,
76 network_source: Option<Mutex<Box<dyn TriggerSource>>>,
77 stopping: AtomicBool,
78 handle: Mutex<Option<JoinHandle<()>>>,
79 stdin: StdinSource,
80 state: Mutex<TriggerState>,
81}
82
83#[cfg(test)]
84#[allow(dead_code)]
85pub(crate) fn enqueue_trigger_event(source: &ServerAddress, payload: Vec<u8>) -> Result<()> {
86 let key = match source {
87 ServerAddress::Webhook { route } => Some(format!("webhook:{route}")),
88 ServerAddress::Imap { address, mailbox } => Some(format!("imap:{address}:{mailbox}")),
89 ServerAddress::Smtp { address } => Some(format!("smtp:{address}")),
90 ServerAddress::Telegram { chat_id, bot } => Some(format!("telegram:{chat_id}:{bot}")),
91 ServerAddress::Matrix { room_id } => Some(format!("matrix:{room_id}")),
92 _ => None,
93 }
94 .ok_or_else(|| Error::Eval("source does not use the queued trigger runtime".to_owned()))?;
95 let mut queues = queued_trigger_events()
96 .lock()
97 .map_err(|_| Error::PoisonedLock("trigger queue"))?;
98 queues.entry(key).or_default().push(payload);
99 Ok(())
100}
101
102#[cfg(test)]
103pub(crate) fn loopback_smtp_messages_for(source: &ServerAddress) -> Result<Vec<Vec<u8>>> {
104 let ServerAddress::Smtp { address } = source else {
105 return Err(Error::Eval(
106 "source does not use the loopback smtp runtime".to_owned(),
107 ));
108 };
109 let key = format!("smtp:{address}");
110 let messages = loopback_smtp_messages()
111 .lock()
112 .map_err(|_| Error::PoisonedLock("loopback smtp messages"))?;
113 Ok(messages.get(&key).cloned().unwrap_or_default())
114}
115
116impl Object for TriggerHandle {
117 fn display(&self, _cx: &mut Cx) -> Result<String> {
118 Ok("#<server-trigger>".to_owned())
119 }
120
121 fn as_any(&self) -> &dyn std::any::Any {
122 self
123 }
124}
125
126impl sim_kernel::ObjectCompat for TriggerHandle {
127 fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
128 cx.factory().class_stub(
129 sim_kernel::ClassId(0),
130 Symbol::qualified("server", "Trigger"),
131 )
132 }
133 fn as_expr(&self, cx: &mut Cx) -> Result<Expr> {
134 self.reflect_value(cx)?.object().as_expr(cx)
135 }
136 fn as_table(&self, cx: &mut Cx) -> Result<Value> {
137 self.reflect_value(cx)
138 }
139}
140
141pub(crate) fn register_trigger(
142 cx: &mut Cx,
143 server: Arc<Server>,
144 source_expr: Expr,
145 decode_expr: Expr,
146 role: Option<Symbol>,
147 codec: Symbol,
148) -> Result<Arc<TriggerHandle>> {
149 let source_expr = literal_expr(&source_expr).clone();
150 let source = ServerAddress::from_expr(&source_expr)?;
151 if let Some(capability) = source_capability(&source) {
152 cx.require(&capability)?;
153 }
154 let cron = match &source {
155 ServerAddress::Cron { spec } => Some(CronMatcher::parse(spec)?),
156 _ => None,
157 };
158 let network_source = build_network_source(&source, &source_expr)?;
159
160 ensure_installed_codec(cx, &codec)?;
161 let decoder = build_decoder(cx, &decode_expr)?;
162 let trigger = Arc::new(TriggerHandle::new(
163 server.clone(),
164 TriggerConfig {
165 source,
166 source_expr,
167 role,
168 codec,
169 decode_expr,
170 decoder,
171 cron,
172 network_source,
173 },
174 ));
175 server.register_trigger(trigger.clone())?;
176 trigger.start(cx)?;
177 Ok(trigger)
178}
179
180fn build_decoder(cx: &mut Cx, decode_expr: &Expr) -> Result<TriggerDecoder> {
181 let expr = literal_expr(decode_expr).clone();
182 if let Expr::Symbol(symbol) = &expr
183 && let Some(codec) = normalize_codec_symbol(cx, symbol)
184 {
185 ensure_installed_codec(cx, &codec)?;
186 return Ok(TriggerDecoder::Codec(codec));
187 }
188 let callable = cx.eval_expr(expr)?;
189 if callable.object().as_callable().is_none() {
190 return Err(Error::TypeMismatch {
191 expected: "callable or codec symbol",
192 found: "non-callable",
193 });
194 }
195 Ok(TriggerDecoder::Callable(callable))
196}
197
198fn normalize_codec_symbol(cx: &Cx, symbol: &Symbol) -> Option<Symbol> {
199 if cx.registry().codec_by_symbol(symbol).is_some() {
200 return Some(symbol.clone());
201 }
202 let qualified = Symbol::qualified("codec", symbol.name.to_string());
203 if cx.registry().codec_by_symbol(&qualified).is_some() {
204 return Some(qualified);
205 }
206 None
207}
208
209fn literal_expr(expr: &Expr) -> &Expr {
210 match expr {
211 Expr::Quote { expr, .. } => expr,
212 _ => expr,
213 }
214}
215
216fn trigger_read_policy(cx: &Cx) -> ReadPolicy {
217 let mut capabilities = sim_kernel::CapabilitySet::new();
218 if cx.require(&read_construct_capability()).is_ok() {
219 capabilities.insert(read_construct_capability());
220 }
221 if cx.require(&read_eval_capability()).is_ok() {
222 capabilities.insert(read_eval_capability());
223 }
224 ReadPolicy {
225 trust: TrustLevel::TrustedSource,
226 capabilities,
227 }
228}
229
230fn io_error_to_host(err: std::io::Error) -> Error {
231 Error::HostError(err.to_string())
232}
233
234fn delivery_timeout() -> Duration {
235 Duration::from_millis(250)
236}
237
238fn source_loopback_enabled(expr: &Expr) -> bool {
239 source_loopback(expr).unwrap_or(false)
240}