Skip to main content

iii_sdk/
builtin_triggers.rs

1use std::collections::HashMap;
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::protocol::RegisterTriggerInput;
8
9// ── HTTP ────────────────────────────────────────────────────────────────
10
11#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
12pub struct HttpTriggerConfig {
13    /// HTTP endpoint path (e.g. `/users/:id`)
14    pub api_path: String,
15    /// HTTP method (defaults to GET)
16    #[serde(skip_serializing_if = "Option::is_none")]
17    pub http_method: Option<HttpMethod>,
18    /// Optional function ID to evaluate before invoking handler
19    #[serde(skip_serializing_if = "Option::is_none")]
20    pub condition_function_id: Option<String>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
24#[serde(rename_all = "UPPERCASE")]
25pub enum HttpMethod {
26    Get,
27    Post,
28    Put,
29    Delete,
30    Patch,
31    Head,
32    Options,
33}
34
35impl HttpTriggerConfig {
36    pub fn new(api_path: impl Into<String>) -> Self {
37        Self {
38            api_path: api_path.into(),
39            http_method: None,
40            condition_function_id: None,
41        }
42    }
43
44    pub fn method(mut self, method: HttpMethod) -> Self {
45        self.http_method = Some(method);
46        self
47    }
48
49    pub fn condition(mut self, function_id: impl Into<String>) -> Self {
50        self.condition_function_id = Some(function_id.into());
51        self
52    }
53}
54
55// ── Cron ────────────────────────────────────────────────────────────────
56
57#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
58pub struct CronTriggerConfig {
59    /// Cron expression (6-field format: sec min hour day month weekday)
60    pub expression: String,
61    /// Optional function ID to evaluate before invoking handler
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub condition_function_id: Option<String>,
64}
65
66impl CronTriggerConfig {
67    pub fn new(expression: impl Into<String>) -> Self {
68        Self {
69            expression: expression.into(),
70            condition_function_id: None,
71        }
72    }
73
74    pub fn condition(mut self, function_id: impl Into<String>) -> Self {
75        self.condition_function_id = Some(function_id.into());
76        self
77    }
78}
79
80// ── Queue ───────────────────────────────────────────────────────────────
81
82#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
83pub struct QueueTriggerConfig {
84    /// Queue topic to subscribe to
85    pub topic: String,
86    /// Optional function ID to evaluate before invoking handler
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub condition_function_id: Option<String>,
89    /// Queue-specific subscriber configuration
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub queue_config: Option<Value>,
92}
93
94impl QueueTriggerConfig {
95    pub fn new(topic: impl Into<String>) -> Self {
96        Self {
97            topic: topic.into(),
98            condition_function_id: None,
99            queue_config: None,
100        }
101    }
102
103    pub fn condition(mut self, function_id: impl Into<String>) -> Self {
104        self.condition_function_id = Some(function_id.into());
105        self
106    }
107
108    pub fn queue_config(mut self, config: impl Serialize) -> Result<Self, serde_json::Error> {
109        self.queue_config = Some(serde_json::to_value(config)?);
110        Ok(self)
111    }
112}
113
114// ── PubSub (subscribe) ─────────────────────────────────────────────────
115
116#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
117pub struct SubscribeTriggerConfig {
118    /// Topic to subscribe to
119    pub topic: String,
120    /// Optional function ID to evaluate before invoking handler
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub condition_function_id: Option<String>,
123}
124
125impl SubscribeTriggerConfig {
126    pub fn new(topic: impl Into<String>) -> Self {
127        Self {
128            topic: topic.into(),
129            condition_function_id: None,
130        }
131    }
132
133    pub fn condition(mut self, function_id: impl Into<String>) -> Self {
134        self.condition_function_id = Some(function_id.into());
135        self
136    }
137}
138
139// ── State ───────────────────────────────────────────────────────────────
140
141#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
142pub struct StateTriggerConfig {
143    /// State scope to watch (exact match filter)
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub scope: Option<String>,
146    /// State key to watch (exact match filter)
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub key: Option<String>,
149    /// Optional function ID to evaluate before invoking handler
150    #[serde(skip_serializing_if = "Option::is_none")]
151    pub condition_function_id: Option<String>,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
155pub enum StateEventType {
156    #[serde(rename = "state:created")]
157    Created,
158    #[serde(rename = "state:updated")]
159    Updated,
160    #[serde(rename = "state:deleted")]
161    Deleted,
162}
163
164impl StateTriggerConfig {
165    pub fn new() -> Self {
166        Self {
167            scope: None,
168            key: None,
169            condition_function_id: None,
170        }
171    }
172
173    pub fn scope(mut self, scope: impl Into<String>) -> Self {
174        self.scope = Some(scope.into());
175        self
176    }
177
178    pub fn key(mut self, key: impl Into<String>) -> Self {
179        self.key = Some(key.into());
180        self
181    }
182
183    pub fn condition(mut self, function_id: impl Into<String>) -> Self {
184        self.condition_function_id = Some(function_id.into());
185        self
186    }
187}
188
189impl Default for StateTriggerConfig {
190    fn default() -> Self {
191        Self::new()
192    }
193}
194
195// ── Stream ──────────────────────────────────────────────────────────────
196
197#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
198pub struct StreamJoinLeaveTriggerConfig {
199    /// Stream name to watch
200    #[serde(skip_serializing_if = "Option::is_none")]
201    pub stream_name: Option<String>,
202    /// Optional function ID to evaluate before invoking handler
203    #[serde(skip_serializing_if = "Option::is_none")]
204    pub condition_function_id: Option<String>,
205}
206
207impl StreamJoinLeaveTriggerConfig {
208    pub fn new() -> Self {
209        Self {
210            stream_name: None,
211            condition_function_id: None,
212        }
213    }
214
215    pub fn stream_name(mut self, name: impl Into<String>) -> Self {
216        self.stream_name = Some(name.into());
217        self
218    }
219
220    pub fn condition(mut self, function_id: impl Into<String>) -> Self {
221        self.condition_function_id = Some(function_id.into());
222        self
223    }
224}
225
226impl Default for StreamJoinLeaveTriggerConfig {
227    fn default() -> Self {
228        Self::new()
229    }
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
233pub struct StreamTriggerConfig {
234    /// Stream name to watch
235    #[serde(skip_serializing_if = "Option::is_none")]
236    pub stream_name: Option<String>,
237    /// Group ID filter
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub group_id: Option<String>,
240    /// Item ID filter
241    #[serde(skip_serializing_if = "Option::is_none")]
242    pub item_id: Option<String>,
243    /// Optional function ID to evaluate before invoking handler
244    #[serde(skip_serializing_if = "Option::is_none")]
245    pub condition_function_id: Option<String>,
246}
247
248impl StreamTriggerConfig {
249    pub fn new() -> Self {
250        Self {
251            stream_name: None,
252            group_id: None,
253            item_id: None,
254            condition_function_id: None,
255        }
256    }
257
258    pub fn stream_name(mut self, name: impl Into<String>) -> Self {
259        self.stream_name = Some(name.into());
260        self
261    }
262
263    pub fn group_id(mut self, id: impl Into<String>) -> Self {
264        self.group_id = Some(id.into());
265        self
266    }
267
268    pub fn item_id(mut self, id: impl Into<String>) -> Self {
269        self.item_id = Some(id.into());
270        self
271    }
272
273    pub fn condition(mut self, function_id: impl Into<String>) -> Self {
274        self.condition_function_id = Some(function_id.into());
275        self
276    }
277}
278
279impl Default for StreamTriggerConfig {
280    fn default() -> Self {
281        Self::new()
282    }
283}
284
285// ── Log ─────────────────────────────────────────────────────────────────
286
287#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
288pub struct LogTriggerConfig {
289    /// Minimum log level to trigger on
290    #[serde(skip_serializing_if = "Option::is_none")]
291    pub level: Option<LogLevel>,
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
295#[serde(rename_all = "lowercase")]
296pub enum LogLevel {
297    All,
298    Debug,
299    Info,
300    Warn,
301    Error,
302}
303
304impl LogTriggerConfig {
305    pub fn new() -> Self {
306        Self { level: None }
307    }
308
309    pub fn level(mut self, level: LogLevel) -> Self {
310        self.level = Some(level);
311        self
312    }
313}
314
315impl Default for LogTriggerConfig {
316    fn default() -> Self {
317        Self::new()
318    }
319}
320
321// ── Call request types ──────────────────────────────────────────────────
322
323#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
324pub struct HttpCallRequest {
325    pub query_params: HashMap<String, String>,
326    pub path_params: HashMap<String, String>,
327    pub headers: HashMap<String, String>,
328    pub path: String,
329    pub method: String,
330    pub body: Value,
331}
332
333#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
334pub struct CronCallRequest {
335    pub trigger: String,
336    pub job_id: String,
337    pub scheduled_time: String,
338    pub actual_time: String,
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
342pub struct StateCallRequest {
343    #[serde(rename = "type")]
344    pub message_type: String,
345    pub event_type: StateEventType,
346    pub scope: String,
347    pub key: String,
348    pub old_value: Option<Value>,
349    pub new_value: Value,
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
353pub struct StreamJoinLeaveCallRequest {
354    pub subscription_id: String,
355    pub stream_name: String,
356    pub group_id: String,
357    pub id: Option<String>,
358    pub context: Option<Value>,
359}
360
361/// The kind of mutation that occurred on a stream item.
362#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
363#[serde(rename_all = "lowercase")]
364pub enum StreamEventType {
365    Create,
366    Update,
367    Delete,
368}
369
370/// Detail of a stream change event containing the mutation type and data.
371#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
372pub struct StreamEventDetail {
373    /// The kind of mutation (create, update, or delete).
374    #[serde(rename = "type")]
375    pub event_type: StreamEventType,
376    /// The data associated with the event.
377    pub data: Value,
378}
379
380/// Handler input for `stream` triggers, fired when an item changes
381/// via `stream::set`, `stream::update`, or `stream::delete`.
382#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
383pub struct StreamCallRequest {
384    /// Always `"stream"`.
385    #[serde(rename = "type")]
386    pub event_type: String,
387    /// Unix timestamp (milliseconds) of the event.
388    pub timestamp: i64,
389    /// The stream where the change occurred.
390    #[serde(rename = "streamName")]
391    pub stream_name: String,
392    /// The group where the change occurred.
393    #[serde(rename = "groupId")]
394    pub group_id: String,
395    /// The item ID that changed.
396    pub id: Option<String>,
397    /// The event detail containing mutation type and data.
398    pub event: StreamEventDetail,
399}
400
401#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
402pub struct LogCallRequest {
403    pub timestamp_unix_nano: u64,
404    pub observed_timestamp_unix_nano: u64,
405    pub severity_number: u32,
406    pub severity_text: String,
407    pub body: String,
408    pub attributes: Value,
409    pub trace_id: String,
410    pub span_id: String,
411    pub resource: Value,
412    pub service_name: String,
413    pub instrumentation_scope_name: String,
414    pub instrumentation_scope_version: String,
415}
416
417// ── IIITrigger enum ────────────────────────────────────────────────────
418
419/// Enum of all built-in trigger types with typed configuration.
420///
421/// Use `.for_function()` to create a [`RegisterTriggerInput`]:
422/// ```rust,no_run
423/// # use iii_sdk::builtin_triggers::*;
424/// let input = IIITrigger::Cron(CronTriggerConfig::new("0 * * * * *"))
425///     .for_function("my::handler");
426/// ```
427#[derive(Debug, Clone, Serialize)]
428#[serde(untagged)]
429pub enum IIITrigger {
430    Http(HttpTriggerConfig),
431    Cron(CronTriggerConfig),
432    Queue(QueueTriggerConfig),
433    Subscribe(SubscribeTriggerConfig),
434    State(StateTriggerConfig),
435    Stream(StreamTriggerConfig),
436    StreamJoin(StreamJoinLeaveTriggerConfig),
437    StreamLeave(StreamJoinLeaveTriggerConfig),
438    Log(LogTriggerConfig),
439}
440
441impl IIITrigger {
442    fn trigger_type_id(&self) -> &'static str {
443        match self {
444            Self::Http(_) => "http",
445            Self::Cron(_) => "cron",
446            Self::Queue(_) => "queue",
447            Self::Subscribe(_) => "subscribe",
448            Self::State(_) => "state",
449            Self::Stream(_) => "stream",
450            Self::StreamJoin(_) => "stream:join",
451            Self::StreamLeave(_) => "stream:leave",
452            Self::Log(_) => "log",
453        }
454    }
455
456    /// Create a [`RegisterTriggerInput`] binding this trigger to a function.
457    pub fn for_function(self, function_id: impl Into<String>) -> RegisterTriggerInput {
458        RegisterTriggerInput {
459            trigger_type: self.trigger_type_id().to_string(),
460            function_id: function_id.into(),
461            config: serde_json::to_value(&self).unwrap(),
462            metadata: None,
463        }
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470    use serde_json::json;
471
472    #[test]
473    fn http_trigger_config_serializes_typed_method_enum() {
474        let config = HttpTriggerConfig::new("health").method(HttpMethod::Get);
475        let value = serde_json::to_value(config).expect("http trigger config should serialize");
476
477        assert_eq!(value["http_method"], "GET");
478    }
479
480    #[test]
481    fn log_trigger_config_serializes_typed_level_enum() {
482        let config = LogTriggerConfig::new().level(LogLevel::Error);
483        let value = serde_json::to_value(config).expect("log trigger config should serialize");
484
485        assert_eq!(value["level"], "error");
486    }
487
488    #[test]
489    fn state_call_request_deserializes_typed_event_type() {
490        let request: StateCallRequest = serde_json::from_value(json!({
491            "type": "state",
492            "event_type": "state:updated",
493            "scope": "users",
494            "key": "123",
495            "old_value": { "name": "old" },
496            "new_value": { "name": "new" }
497        }))
498        .expect("state call request should deserialize");
499
500        assert!(matches!(request.event_type, StateEventType::Updated));
501    }
502
503    #[test]
504    fn queue_config_returns_error_instead_of_panicking() {
505        struct FailingSerialize;
506
507        impl Serialize for FailingSerialize {
508            fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
509            where
510                S: serde::Serializer,
511            {
512                Err(serde::ser::Error::custom("boom"))
513            }
514        }
515
516        let result = QueueTriggerConfig::new("emails").queue_config(FailingSerialize);
517
518        assert!(result.is_err(), "serialization failures should be returned");
519    }
520
521    #[test]
522    fn stream_call_request_deserializes_typed_event() {
523        let request: StreamCallRequest = serde_json::from_value(json!({
524            "type": "stream",
525            "timestamp": 1700000000000_i64,
526            "streamName": "chat",
527            "groupId": "room-1",
528            "id": "msg-42",
529            "event": {
530                "type": "create",
531                "data": { "text": "hello" }
532            }
533        }))
534        .expect("stream call request should deserialize");
535
536        assert_eq!(request.event_type, "stream");
537        assert_eq!(request.stream_name, "chat");
538        assert_eq!(request.group_id, "room-1");
539        assert_eq!(request.id.as_deref(), Some("msg-42"));
540        assert!(matches!(request.event.event_type, StreamEventType::Create));
541        assert_eq!(request.event.data, json!({ "text": "hello" }));
542    }
543
544    #[test]
545    fn stream_event_type_roundtrip() {
546        for (variant, expected) in [
547            (StreamEventType::Create, "create"),
548            (StreamEventType::Update, "update"),
549            (StreamEventType::Delete, "delete"),
550        ] {
551            let json = serde_json::to_value(&variant).unwrap();
552            assert_eq!(json, expected);
553            let back: StreamEventType = serde_json::from_value(json).unwrap();
554            assert_eq!(back, variant);
555        }
556    }
557
558    #[test]
559    fn stream_join_uses_dedicated_join_leave_config_shape() {
560        let trigger =
561            IIITrigger::StreamJoin(StreamJoinLeaveTriggerConfig::new().stream_name("chat"))
562                .for_function("example::on_join");
563
564        assert_eq!(trigger.config, json!({ "stream_name": "chat" }));
565    }
566}