meerkat_runtime/
durability.rs1use crate::input::{Input, InputDurability, PeerConvention};
8
9#[derive(Debug, Clone, thiserror::Error)]
11#[non_exhaustive]
12pub enum DurabilityError {
13 #[error("Derived durability forbidden for {kind}")]
15 DerivedForbidden { kind: String },
16
17 #[error("External ingress cannot submit derived inputs")]
19 ExternalDerivedForbidden,
20}
21
22pub fn validate_durability(input: &Input) -> Result<(), DurabilityError> {
24 let durability = input.header().durability;
25
26 if durability == InputDurability::Derived {
28 match &input.header().source {
29 crate::input::InputOrigin::Operator
30 | crate::input::InputOrigin::Peer { .. }
31 | crate::input::InputOrigin::External { .. } => {
32 return Err(DurabilityError::ExternalDerivedForbidden);
33 }
34 crate::input::InputOrigin::System | crate::input::InputOrigin::Flow { .. } => {}
36 }
37 }
38
39 if durability == InputDurability::Derived {
41 match input {
42 Input::Prompt(_) => {
43 return Err(DurabilityError::DerivedForbidden {
44 kind: "prompt".into(),
45 });
46 }
47 Input::Peer(p) => {
48 match &p.convention {
49 Some(
50 PeerConvention::Message
51 | PeerConvention::Request { .. }
52 | PeerConvention::ResponseTerminal { .. },
53 ) => {
54 return Err(DurabilityError::DerivedForbidden {
55 kind: format!("peer_{}", input.kind_id().0),
56 });
57 }
58 Some(PeerConvention::ResponseProgress { .. }) | None => {}
60 }
61 }
62 Input::FlowStep(_) => {
63 return Err(DurabilityError::DerivedForbidden {
64 kind: "flow_step".into(),
65 });
66 }
67 Input::ExternalEvent(_) | Input::SystemGenerated(_) | Input::Projected(_) => {}
69 }
70 }
71
72 Ok(())
73}
74
75#[cfg(test)]
76#[allow(clippy::unwrap_used)]
77mod tests {
78 use super::*;
79 use crate::input::*;
80 use chrono::Utc;
81 use meerkat_core::lifecycle::InputId;
82
83 fn make_header(durability: InputDurability, source: InputOrigin) -> InputHeader {
84 InputHeader {
85 id: InputId::new(),
86 timestamp: Utc::now(),
87 source,
88 durability,
89 visibility: InputVisibility::default(),
90 idempotency_key: None,
91 supersession_key: None,
92 correlation_id: None,
93 }
94 }
95
96 #[test]
97 fn prompt_derived_rejected() {
98 let input = Input::Prompt(PromptInput {
99 header: make_header(InputDurability::Derived, InputOrigin::System),
100 text: "hi".into(),
101 turn_metadata: None,
102 });
103 assert!(validate_durability(&input).is_err());
104 }
105
106 #[test]
107 fn prompt_durable_accepted() {
108 let input = Input::Prompt(PromptInput {
109 header: make_header(InputDurability::Durable, InputOrigin::Operator),
110 text: "hi".into(),
111 turn_metadata: None,
112 });
113 assert!(validate_durability(&input).is_ok());
114 }
115
116 #[test]
117 fn prompt_ephemeral_accepted() {
118 let input = Input::Prompt(PromptInput {
119 header: make_header(InputDurability::Ephemeral, InputOrigin::Operator),
120 text: "hi".into(),
121 turn_metadata: None,
122 });
123 assert!(validate_durability(&input).is_ok());
124 }
125
126 #[test]
127 fn peer_message_derived_rejected() {
128 let input = Input::Peer(PeerInput {
129 header: make_header(InputDurability::Derived, InputOrigin::System),
130 convention: Some(PeerConvention::Message),
131 body: "hi".into(),
132 });
133 assert!(validate_durability(&input).is_err());
134 }
135
136 #[test]
137 fn peer_request_derived_rejected() {
138 let input = Input::Peer(PeerInput {
139 header: make_header(InputDurability::Derived, InputOrigin::System),
140 convention: Some(PeerConvention::Request {
141 request_id: "r".into(),
142 intent: "i".into(),
143 }),
144 body: "hi".into(),
145 });
146 assert!(validate_durability(&input).is_err());
147 }
148
149 #[test]
150 fn peer_response_terminal_derived_rejected() {
151 let input = Input::Peer(PeerInput {
152 header: make_header(InputDurability::Derived, InputOrigin::System),
153 convention: Some(PeerConvention::ResponseTerminal {
154 request_id: "r".into(),
155 status: ResponseTerminalStatus::Completed,
156 }),
157 body: "done".into(),
158 });
159 assert!(validate_durability(&input).is_err());
160 }
161
162 #[test]
163 fn peer_response_progress_derived_accepted() {
164 let input = Input::Peer(PeerInput {
165 header: make_header(InputDurability::Derived, InputOrigin::System),
166 convention: Some(PeerConvention::ResponseProgress {
167 request_id: "r".into(),
168 phase: ResponseProgressPhase::InProgress,
169 }),
170 body: "working".into(),
171 });
172 assert!(validate_durability(&input).is_ok());
173 }
174
175 #[test]
176 fn flow_step_derived_rejected() {
177 let input = Input::FlowStep(FlowStepInput {
178 header: make_header(InputDurability::Derived, InputOrigin::System),
179 step_id: "s1".into(),
180 instructions: "do it".into(),
181 turn_metadata: None,
182 });
183 assert!(validate_durability(&input).is_err());
184 }
185
186 #[test]
187 fn external_event_derived_from_system_accepted() {
188 let input = Input::ExternalEvent(ExternalEventInput {
189 header: make_header(InputDurability::Derived, InputOrigin::System),
190 event_type: "test".into(),
191 payload: serde_json::json!({}),
192 });
193 assert!(validate_durability(&input).is_ok());
194 }
195
196 #[test]
197 fn external_ingress_derived_rejected() {
198 let input = Input::ExternalEvent(ExternalEventInput {
199 header: make_header(
200 InputDurability::Derived,
201 InputOrigin::External {
202 source_name: "webhook".into(),
203 },
204 ),
205 event_type: "test".into(),
206 payload: serde_json::json!({}),
207 });
208 assert!(validate_durability(&input).is_err());
209 }
210
211 #[test]
212 fn operator_derived_rejected() {
213 let input = Input::SystemGenerated(SystemGeneratedInput {
214 header: make_header(InputDurability::Derived, InputOrigin::Operator),
215 generator: "test".into(),
216 content: "content".into(),
217 });
218 assert!(validate_durability(&input).is_err());
219 }
220
221 #[test]
222 fn projected_derived_from_system_accepted() {
223 let input = Input::Projected(ProjectedInput {
224 header: make_header(InputDurability::Derived, InputOrigin::System),
225 rule_id: "rule-1".into(),
226 source_event_id: "evt-1".into(),
227 content: "projected".into(),
228 });
229 assert!(validate_durability(&input).is_ok());
230 }
231}