1use crate::input::{Input, InputDurability, PeerConvention};
8
9#[derive(Debug, Clone, thiserror::Error)]
11#[non_exhaustive]
12pub enum DurabilityError {
13 #[error("Derived durability forbidden for {kind}")]
15 DerivedForbidden { kind: String },
16
17 #[error("External ingress cannot submit derived inputs")]
19 ExternalDerivedForbidden,
20}
21
22pub fn validate_durability(input: &Input) -> Result<(), DurabilityError> {
24 let durability = input.header().durability;
25
26 if durability == InputDurability::Derived {
28 match &input.header().source {
29 crate::input::InputOrigin::Operator
30 | crate::input::InputOrigin::Peer { .. }
31 | crate::input::InputOrigin::External { .. } => {
32 return Err(DurabilityError::ExternalDerivedForbidden);
33 }
34 crate::input::InputOrigin::System | crate::input::InputOrigin::Flow { .. } => {}
36 }
37 }
38
39 if durability == InputDurability::Derived {
41 match input {
42 Input::Prompt(_) => {
43 return Err(DurabilityError::DerivedForbidden {
44 kind: "prompt".into(),
45 });
46 }
47 Input::Peer(p) => {
48 match &p.convention {
49 Some(
50 PeerConvention::Message
51 | PeerConvention::Request { .. }
52 | PeerConvention::ResponseTerminal { .. },
53 ) => {
54 return Err(DurabilityError::DerivedForbidden {
55 kind: format!("peer_{}", input.kind().as_str()),
56 });
57 }
58 Some(PeerConvention::ResponseProgress { .. }) | None => {}
60 }
61 }
62 Input::FlowStep(_) => {
63 return Err(DurabilityError::DerivedForbidden {
64 kind: "flow_step".into(),
65 });
66 }
67 Input::ExternalEvent(_) | Input::Continuation(_) | Input::Operation(_) => {}
70 }
71 }
72
73 Ok(())
74}
75
76#[cfg(test)]
77#[allow(clippy::unwrap_used)]
78mod tests {
79 use super::*;
80 use crate::input::*;
81 use chrono::Utc;
82 use meerkat_core::lifecycle::InputId;
83 use meerkat_core::types::HandlingMode;
84
85 fn make_header(durability: InputDurability, source: InputOrigin) -> InputHeader {
86 InputHeader {
87 id: InputId::new(),
88 timestamp: Utc::now(),
89 source,
90 durability,
91 visibility: InputVisibility::default(),
92 idempotency_key: None,
93 supersession_key: None,
94 correlation_id: None,
95 }
96 }
97
98 #[test]
99 fn prompt_derived_rejected() {
100 let input = Input::Prompt(PromptInput {
101 header: make_header(InputDurability::Derived, InputOrigin::System),
102 text: "hi".into(),
103 blocks: None,
104 typed_turn_appends: Vec::new(),
105 turn_metadata: None,
106 });
107 assert!(validate_durability(&input).is_err());
108 }
109
110 #[test]
111 fn prompt_durable_accepted() {
112 let input = Input::Prompt(PromptInput {
113 header: make_header(InputDurability::Durable, InputOrigin::Operator),
114 text: "hi".into(),
115 blocks: None,
116 typed_turn_appends: Vec::new(),
117 turn_metadata: None,
118 });
119 assert!(validate_durability(&input).is_ok());
120 }
121
122 #[test]
123 fn prompt_ephemeral_accepted() {
124 let input = Input::Prompt(PromptInput {
125 header: make_header(InputDurability::Ephemeral, InputOrigin::Operator),
126 text: "hi".into(),
127 blocks: None,
128 typed_turn_appends: Vec::new(),
129 turn_metadata: None,
130 });
131 assert!(validate_durability(&input).is_ok());
132 }
133
134 #[test]
135 fn peer_message_derived_rejected() {
136 let input = Input::Peer(PeerInput {
137 header: make_header(InputDurability::Derived, InputOrigin::System),
138 convention: Some(PeerConvention::Message),
139 body: "hi".into(),
140 payload: None,
141 blocks: None,
142 handling_mode: None,
143 });
144 assert!(validate_durability(&input).is_err());
145 }
146
147 #[test]
148 fn peer_request_derived_rejected() {
149 let input = Input::Peer(PeerInput {
150 header: make_header(InputDurability::Derived, InputOrigin::System),
151 convention: Some(PeerConvention::Request {
152 request_id: "r".into(),
153 intent: "i".into(),
154 }),
155 body: "hi".into(),
156 payload: Some(serde_json::json!({"subject": "x"})),
157 blocks: None,
158 handling_mode: None,
159 });
160 assert!(validate_durability(&input).is_err());
161 }
162
163 #[test]
164 fn peer_response_terminal_derived_rejected() {
165 let input = Input::Peer(PeerInput {
166 header: make_header(InputDurability::Derived, InputOrigin::System),
167 convention: Some(PeerConvention::ResponseTerminal {
168 request_id: "r".into(),
169 status: ResponseTerminalStatus::Completed,
170 }),
171 body: "done".into(),
172 payload: Some(serde_json::json!({"ok": true})),
173 blocks: None,
174 handling_mode: None,
175 });
176 assert!(validate_durability(&input).is_err());
177 }
178
179 #[test]
180 fn peer_response_progress_derived_accepted() {
181 let input = Input::Peer(PeerInput {
182 header: make_header(InputDurability::Derived, InputOrigin::System),
183 convention: Some(PeerConvention::ResponseProgress {
184 request_id: "r".into(),
185 phase: ResponseProgressPhase::InProgress,
186 }),
187 body: "working".into(),
188 payload: Some(serde_json::json!({"progress": "working"})),
189 blocks: None,
190 handling_mode: None,
191 });
192 assert!(validate_durability(&input).is_ok());
193 }
194
195 #[test]
196 fn flow_step_derived_rejected() {
197 let input = Input::FlowStep(FlowStepInput {
198 header: make_header(InputDurability::Derived, InputOrigin::System),
199 step_id: "s1".into(),
200 instructions: "do it".into(),
201 blocks: None,
202 turn_metadata: None,
203 });
204 assert!(validate_durability(&input).is_err());
205 }
206
207 #[test]
208 fn external_event_derived_from_system_accepted() {
209 let input = Input::ExternalEvent(ExternalEventInput {
210 header: make_header(InputDurability::Derived, InputOrigin::System),
211 event_type: "test".into(),
212 payload: serde_json::json!({}),
213 blocks: None,
214 handling_mode: HandlingMode::Queue,
215 render_metadata: None,
216 });
217 assert!(validate_durability(&input).is_ok());
218 }
219
220 #[test]
221 fn external_ingress_derived_rejected() {
222 let input = Input::ExternalEvent(ExternalEventInput {
223 header: make_header(
224 InputDurability::Derived,
225 InputOrigin::External {
226 source_name: "webhook".into(),
227 },
228 ),
229 event_type: "test".into(),
230 payload: serde_json::json!({}),
231 blocks: None,
232 handling_mode: HandlingMode::Queue,
233 render_metadata: None,
234 });
235 assert!(validate_durability(&input).is_err());
236 }
237
238 #[test]
239 fn operator_derived_rejected() {
240 let input = Input::Continuation(ContinuationInput {
241 header: make_header(InputDurability::Derived, InputOrigin::Operator),
242 reason: "test".into(),
243 handling_mode: meerkat_core::types::HandlingMode::Steer,
244 request_id: None,
245 });
246 assert!(validate_durability(&input).is_err());
247 }
248
249 #[test]
250 fn operation_derived_from_system_accepted() {
251 let input = Input::Operation(OperationInput {
252 header: make_header(InputDurability::Derived, InputOrigin::System),
253 operation_id: meerkat_core::ops::OperationId::new(),
254 event: meerkat_core::ops::OpEvent::Cancelled {
255 id: meerkat_core::ops::OperationId::new(),
256 },
257 });
258 assert!(validate_durability(&input).is_ok());
259 }
260}