1use std::collections::HashMap;
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::protocol::RegisterTriggerInput;
8
9#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
12pub struct HttpTriggerConfig {
13 pub api_path: String,
15 #[serde(skip_serializing_if = "Option::is_none")]
17 pub http_method: Option<HttpMethod>,
18 #[serde(skip_serializing_if = "Option::is_none")]
20 pub condition_function_id: Option<String>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
24#[serde(rename_all = "UPPERCASE")]
25pub enum HttpMethod {
26 Get,
27 Post,
28 Put,
29 Delete,
30 Patch,
31 Head,
32 Options,
33}
34
35impl HttpTriggerConfig {
36 pub fn new(api_path: impl Into<String>) -> Self {
37 Self {
38 api_path: api_path.into(),
39 http_method: None,
40 condition_function_id: None,
41 }
42 }
43
44 pub fn method(mut self, method: HttpMethod) -> Self {
45 self.http_method = Some(method);
46 self
47 }
48
49 pub fn condition(mut self, function_id: impl Into<String>) -> Self {
50 self.condition_function_id = Some(function_id.into());
51 self
52 }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
58pub struct CronTriggerConfig {
59 pub expression: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub condition_function_id: Option<String>,
64}
65
66impl CronTriggerConfig {
67 pub fn new(expression: impl Into<String>) -> Self {
68 Self {
69 expression: expression.into(),
70 condition_function_id: None,
71 }
72 }
73
74 pub fn condition(mut self, function_id: impl Into<String>) -> Self {
75 self.condition_function_id = Some(function_id.into());
76 self
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
83pub struct QueueTriggerConfig {
84 pub topic: String,
86 #[serde(skip_serializing_if = "Option::is_none")]
88 pub condition_function_id: Option<String>,
89 #[serde(skip_serializing_if = "Option::is_none")]
91 pub queue_config: Option<Value>,
92}
93
94impl QueueTriggerConfig {
95 pub fn new(topic: impl Into<String>) -> Self {
96 Self {
97 topic: topic.into(),
98 condition_function_id: None,
99 queue_config: None,
100 }
101 }
102
103 pub fn condition(mut self, function_id: impl Into<String>) -> Self {
104 self.condition_function_id = Some(function_id.into());
105 self
106 }
107
108 pub fn queue_config(mut self, config: impl Serialize) -> Result<Self, serde_json::Error> {
109 self.queue_config = Some(serde_json::to_value(config)?);
110 Ok(self)
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
117pub struct SubscribeTriggerConfig {
118 pub topic: String,
120 #[serde(skip_serializing_if = "Option::is_none")]
122 pub condition_function_id: Option<String>,
123}
124
125impl SubscribeTriggerConfig {
126 pub fn new(topic: impl Into<String>) -> Self {
127 Self {
128 topic: topic.into(),
129 condition_function_id: None,
130 }
131 }
132
133 pub fn condition(mut self, function_id: impl Into<String>) -> Self {
134 self.condition_function_id = Some(function_id.into());
135 self
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
142pub struct StateTriggerConfig {
143 #[serde(skip_serializing_if = "Option::is_none")]
145 pub scope: Option<String>,
146 #[serde(skip_serializing_if = "Option::is_none")]
148 pub key: Option<String>,
149 #[serde(skip_serializing_if = "Option::is_none")]
151 pub condition_function_id: Option<String>,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
155pub enum StateEventType {
156 #[serde(rename = "state:created")]
157 Created,
158 #[serde(rename = "state:updated")]
159 Updated,
160 #[serde(rename = "state:deleted")]
161 Deleted,
162}
163
164impl StateTriggerConfig {
165 pub fn new() -> Self {
166 Self {
167 scope: None,
168 key: None,
169 condition_function_id: None,
170 }
171 }
172
173 pub fn scope(mut self, scope: impl Into<String>) -> Self {
174 self.scope = Some(scope.into());
175 self
176 }
177
178 pub fn key(mut self, key: impl Into<String>) -> Self {
179 self.key = Some(key.into());
180 self
181 }
182
183 pub fn condition(mut self, function_id: impl Into<String>) -> Self {
184 self.condition_function_id = Some(function_id.into());
185 self
186 }
187}
188
189impl Default for StateTriggerConfig {
190 fn default() -> Self {
191 Self::new()
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
198pub struct StreamJoinLeaveTriggerConfig {
199 #[serde(skip_serializing_if = "Option::is_none")]
201 pub stream_name: Option<String>,
202 #[serde(skip_serializing_if = "Option::is_none")]
204 pub condition_function_id: Option<String>,
205}
206
207impl StreamJoinLeaveTriggerConfig {
208 pub fn new() -> Self {
209 Self {
210 stream_name: None,
211 condition_function_id: None,
212 }
213 }
214
215 pub fn stream_name(mut self, name: impl Into<String>) -> Self {
216 self.stream_name = Some(name.into());
217 self
218 }
219
220 pub fn condition(mut self, function_id: impl Into<String>) -> Self {
221 self.condition_function_id = Some(function_id.into());
222 self
223 }
224}
225
226impl Default for StreamJoinLeaveTriggerConfig {
227 fn default() -> Self {
228 Self::new()
229 }
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
233pub struct StreamTriggerConfig {
234 #[serde(skip_serializing_if = "Option::is_none")]
236 pub stream_name: Option<String>,
237 #[serde(skip_serializing_if = "Option::is_none")]
239 pub group_id: Option<String>,
240 #[serde(skip_serializing_if = "Option::is_none")]
242 pub item_id: Option<String>,
243 #[serde(skip_serializing_if = "Option::is_none")]
245 pub condition_function_id: Option<String>,
246}
247
248impl StreamTriggerConfig {
249 pub fn new() -> Self {
250 Self {
251 stream_name: None,
252 group_id: None,
253 item_id: None,
254 condition_function_id: None,
255 }
256 }
257
258 pub fn stream_name(mut self, name: impl Into<String>) -> Self {
259 self.stream_name = Some(name.into());
260 self
261 }
262
263 pub fn group_id(mut self, id: impl Into<String>) -> Self {
264 self.group_id = Some(id.into());
265 self
266 }
267
268 pub fn item_id(mut self, id: impl Into<String>) -> Self {
269 self.item_id = Some(id.into());
270 self
271 }
272
273 pub fn condition(mut self, function_id: impl Into<String>) -> Self {
274 self.condition_function_id = Some(function_id.into());
275 self
276 }
277}
278
279impl Default for StreamTriggerConfig {
280 fn default() -> Self {
281 Self::new()
282 }
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
288pub struct LogTriggerConfig {
289 #[serde(skip_serializing_if = "Option::is_none")]
291 pub level: Option<LogLevel>,
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
295#[serde(rename_all = "lowercase")]
296pub enum LogLevel {
297 All,
298 Debug,
299 Info,
300 Warn,
301 Error,
302}
303
304impl LogTriggerConfig {
305 pub fn new() -> Self {
306 Self { level: None }
307 }
308
309 pub fn level(mut self, level: LogLevel) -> Self {
310 self.level = Some(level);
311 self
312 }
313}
314
315impl Default for LogTriggerConfig {
316 fn default() -> Self {
317 Self::new()
318 }
319}
320
321#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
324pub struct HttpCallRequest {
325 pub query_params: HashMap<String, String>,
326 pub path_params: HashMap<String, String>,
327 pub headers: HashMap<String, String>,
328 pub path: String,
329 pub method: String,
330 pub body: Value,
331}
332
333#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
334pub struct CronCallRequest {
335 pub trigger: String,
336 pub job_id: String,
337 pub scheduled_time: String,
338 pub actual_time: String,
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
342pub struct StateCallRequest {
343 #[serde(rename = "type")]
344 pub message_type: String,
345 pub event_type: StateEventType,
346 pub scope: String,
347 pub key: String,
348 pub old_value: Option<Value>,
349 pub new_value: Value,
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
353pub struct StreamJoinLeaveCallRequest {
354 pub subscription_id: String,
355 pub stream_name: String,
356 pub group_id: String,
357 pub id: Option<String>,
358 pub context: Option<Value>,
359}
360
361#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
363#[serde(rename_all = "lowercase")]
364pub enum StreamEventType {
365 Create,
366 Update,
367 Delete,
368}
369
370#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
372pub struct StreamEventDetail {
373 #[serde(rename = "type")]
375 pub event_type: StreamEventType,
376 pub data: Value,
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
383pub struct StreamCallRequest {
384 #[serde(rename = "type")]
386 pub event_type: String,
387 pub timestamp: i64,
389 #[serde(rename = "streamName")]
391 pub stream_name: String,
392 #[serde(rename = "groupId")]
394 pub group_id: String,
395 pub id: Option<String>,
397 pub event: StreamEventDetail,
399}
400
401#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
402pub struct LogCallRequest {
403 pub timestamp_unix_nano: u64,
404 pub observed_timestamp_unix_nano: u64,
405 pub severity_number: u32,
406 pub severity_text: String,
407 pub body: String,
408 pub attributes: Value,
409 pub trace_id: String,
410 pub span_id: String,
411 pub resource: Value,
412 pub service_name: String,
413 pub instrumentation_scope_name: String,
414 pub instrumentation_scope_version: String,
415}
416
417#[derive(Debug, Clone, Serialize)]
428#[serde(untagged)]
429pub enum IIITrigger {
430 Http(HttpTriggerConfig),
431 Cron(CronTriggerConfig),
432 Queue(QueueTriggerConfig),
433 Subscribe(SubscribeTriggerConfig),
434 State(StateTriggerConfig),
435 Stream(StreamTriggerConfig),
436 StreamJoin(StreamJoinLeaveTriggerConfig),
437 StreamLeave(StreamJoinLeaveTriggerConfig),
438 Log(LogTriggerConfig),
439}
440
441impl IIITrigger {
442 fn trigger_type_id(&self) -> &'static str {
443 match self {
444 Self::Http(_) => "http",
445 Self::Cron(_) => "cron",
446 Self::Queue(_) => "queue",
447 Self::Subscribe(_) => "subscribe",
448 Self::State(_) => "state",
449 Self::Stream(_) => "stream",
450 Self::StreamJoin(_) => "stream:join",
451 Self::StreamLeave(_) => "stream:leave",
452 Self::Log(_) => "log",
453 }
454 }
455
456 pub fn for_function(self, function_id: impl Into<String>) -> RegisterTriggerInput {
458 RegisterTriggerInput {
459 trigger_type: self.trigger_type_id().to_string(),
460 function_id: function_id.into(),
461 config: serde_json::to_value(&self).unwrap(),
462 metadata: None,
463 }
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470 use serde_json::json;
471
472 #[test]
473 fn http_trigger_config_serializes_typed_method_enum() {
474 let config = HttpTriggerConfig::new("health").method(HttpMethod::Get);
475 let value = serde_json::to_value(config).expect("http trigger config should serialize");
476
477 assert_eq!(value["http_method"], "GET");
478 }
479
480 #[test]
481 fn log_trigger_config_serializes_typed_level_enum() {
482 let config = LogTriggerConfig::new().level(LogLevel::Error);
483 let value = serde_json::to_value(config).expect("log trigger config should serialize");
484
485 assert_eq!(value["level"], "error");
486 }
487
488 #[test]
489 fn state_call_request_deserializes_typed_event_type() {
490 let request: StateCallRequest = serde_json::from_value(json!({
491 "type": "state",
492 "event_type": "state:updated",
493 "scope": "users",
494 "key": "123",
495 "old_value": { "name": "old" },
496 "new_value": { "name": "new" }
497 }))
498 .expect("state call request should deserialize");
499
500 assert!(matches!(request.event_type, StateEventType::Updated));
501 }
502
503 #[test]
504 fn queue_config_returns_error_instead_of_panicking() {
505 struct FailingSerialize;
506
507 impl Serialize for FailingSerialize {
508 fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
509 where
510 S: serde::Serializer,
511 {
512 Err(serde::ser::Error::custom("boom"))
513 }
514 }
515
516 let result = QueueTriggerConfig::new("emails").queue_config(FailingSerialize);
517
518 assert!(result.is_err(), "serialization failures should be returned");
519 }
520
521 #[test]
522 fn stream_call_request_deserializes_typed_event() {
523 let request: StreamCallRequest = serde_json::from_value(json!({
524 "type": "stream",
525 "timestamp": 1700000000000_i64,
526 "streamName": "chat",
527 "groupId": "room-1",
528 "id": "msg-42",
529 "event": {
530 "type": "create",
531 "data": { "text": "hello" }
532 }
533 }))
534 .expect("stream call request should deserialize");
535
536 assert_eq!(request.event_type, "stream");
537 assert_eq!(request.stream_name, "chat");
538 assert_eq!(request.group_id, "room-1");
539 assert_eq!(request.id.as_deref(), Some("msg-42"));
540 assert!(matches!(request.event.event_type, StreamEventType::Create));
541 assert_eq!(request.event.data, json!({ "text": "hello" }));
542 }
543
544 #[test]
545 fn stream_event_type_roundtrip() {
546 for (variant, expected) in [
547 (StreamEventType::Create, "create"),
548 (StreamEventType::Update, "update"),
549 (StreamEventType::Delete, "delete"),
550 ] {
551 let json = serde_json::to_value(&variant).unwrap();
552 assert_eq!(json, expected);
553 let back: StreamEventType = serde_json::from_value(json).unwrap();
554 assert_eq!(back, variant);
555 }
556 }
557
558 #[test]
559 fn stream_join_uses_dedicated_join_leave_config_shape() {
560 let trigger =
561 IIITrigger::StreamJoin(StreamJoinLeaveTriggerConfig::new().stream_name("chat"))
562 .for_function("example::on_join");
563
564 assert_eq!(trigger.config, json!({ "stream_name": "chat" }));
565 }
566}