Skip to main content

harn_vm/triggers/event/
core.rs

1use std::collections::BTreeMap;
2
3use serde::{Deserialize, Deserializer, Serialize, Serializer};
4use serde_json::Value as JsonValue;
5use time::OffsetDateTime;
6use uuid::Uuid;
7
8use super::payloads::ProviderPayload;
9use crate::triggers::test_util::clock;
10
11fn serialize_optional_bytes_b64<S>(
12    value: &Option<Vec<u8>>,
13    serializer: S,
14) -> Result<S::Ok, S::Error>
15where
16    S: Serializer,
17{
18    use base64::Engine;
19
20    match value {
21        Some(bytes) => {
22            serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
23        }
24        None => serializer.serialize_none(),
25    }
26}
27
28fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
29where
30    D: Deserializer<'de>,
31{
32    use base64::Engine;
33
34    let encoded = Option::<String>::deserialize(deserializer)?;
35    encoded
36        .map(|text| {
37            base64::engine::general_purpose::STANDARD
38                .decode(text.as_bytes())
39                .map_err(serde::de::Error::custom)
40        })
41        .transpose()
42}
43
44#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
45#[serde(transparent)]
46pub struct TriggerEventId(pub String);
47
48impl TriggerEventId {
49    pub fn new() -> Self {
50        Self(format!("trigger_evt_{}", Uuid::now_v7()))
51    }
52}
53
54impl Default for TriggerEventId {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
61#[serde(transparent)]
62pub struct ProviderId(pub String);
63
64impl ProviderId {
65    pub fn new(value: impl Into<String>) -> Self {
66        Self(value.into())
67    }
68
69    pub fn as_str(&self) -> &str {
70        self.0.as_str()
71    }
72}
73
74impl From<&str> for ProviderId {
75    fn from(value: &str) -> Self {
76        Self::new(value)
77    }
78}
79
80impl From<String> for ProviderId {
81    fn from(value: String) -> Self {
82        Self::new(value)
83    }
84}
85
86#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
87#[serde(transparent)]
88pub struct TraceId(pub String);
89
90impl TraceId {
91    pub fn new() -> Self {
92        Self(format!("trace_{}", Uuid::now_v7()))
93    }
94}
95
96impl Default for TraceId {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
103#[serde(transparent)]
104pub struct TenantId(pub String);
105
106impl TenantId {
107    pub fn new(value: impl Into<String>) -> Self {
108        Self(value.into())
109    }
110}
111
112#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
113#[serde(tag = "state", rename_all = "snake_case")]
114pub enum SignatureStatus {
115    Verified,
116    Unsigned,
117    Failed { reason: String },
118}
119
120#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
121pub struct TriggerEvent {
122    pub id: TriggerEventId,
123    pub provider: ProviderId,
124    pub kind: String,
125    #[serde(with = "time::serde::rfc3339")]
126    pub received_at: OffsetDateTime,
127    #[serde(with = "time::serde::rfc3339::option")]
128    pub occurred_at: Option<OffsetDateTime>,
129    pub dedupe_key: String,
130    pub trace_id: TraceId,
131    pub tenant_id: Option<TenantId>,
132    pub headers: BTreeMap<String, String>,
133    #[serde(default, skip_serializing_if = "Option::is_none")]
134    pub batch: Option<Vec<JsonValue>>,
135    #[serde(
136        default,
137        skip_serializing_if = "Option::is_none",
138        serialize_with = "serialize_optional_bytes_b64",
139        deserialize_with = "deserialize_optional_bytes_b64"
140    )]
141    pub raw_body: Option<Vec<u8>>,
142    pub provider_payload: ProviderPayload,
143    pub signature_status: SignatureStatus,
144    #[serde(skip)]
145    pub dedupe_claimed: bool,
146}
147
148impl TriggerEvent {
149    #[allow(clippy::too_many_arguments)]
150    pub fn new(
151        provider: ProviderId,
152        kind: impl Into<String>,
153        occurred_at: Option<OffsetDateTime>,
154        dedupe_key: impl Into<String>,
155        tenant_id: Option<TenantId>,
156        headers: BTreeMap<String, String>,
157        provider_payload: ProviderPayload,
158        signature_status: SignatureStatus,
159    ) -> Self {
160        Self {
161            id: TriggerEventId::new(),
162            provider,
163            kind: kind.into(),
164            received_at: clock::now_utc(),
165            occurred_at,
166            dedupe_key: dedupe_key.into(),
167            trace_id: TraceId::new(),
168            tenant_id,
169            headers,
170            batch: None,
171            raw_body: None,
172            provider_payload,
173            signature_status,
174            dedupe_claimed: false,
175        }
176    }
177
178    pub fn dedupe_claimed(&self) -> bool {
179        self.dedupe_claimed
180    }
181
182    pub fn mark_dedupe_claimed(&mut self) {
183        self.dedupe_claimed = true;
184    }
185}
186
187/// Header-focused name for the unified [`crate::redact::RedactionPolicy`].
188///
189/// Trigger ingest paths use this alias at call sites that only redact
190/// HTTP headers, while the underlying policy also covers URLs, JSON
191/// fields, and free-form strings.
192pub type HeaderRedactionPolicy = crate::redact::RedactionPolicy;
193
194pub fn redact_headers(
195    headers: &BTreeMap<String, String>,
196    policy: &HeaderRedactionPolicy,
197) -> BTreeMap<String, String> {
198    policy.redact_headers(headers)
199}