1use crate::{CheckpointKind, PluginMessage, TurnCause, TurnInput};
2
3#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
4#[serde(tag = "scope", rename_all = "snake_case")]
5pub enum TurnInputIngress {
6 ActiveTurn {
7 turn_id: String,
8 #[serde(default)]
9 min_boundary: TurnInputCheckpointBoundary,
10 },
11 NextTurn,
12}
13
14impl TurnInputIngress {
15 pub fn active_turn(
16 turn_id: impl Into<String>,
17 min_boundary: TurnInputCheckpointBoundary,
18 ) -> Self {
19 Self::ActiveTurn {
20 turn_id: turn_id.into(),
21 min_boundary,
22 }
23 }
24
25 pub fn next_turn() -> Self {
26 Self::NextTurn
27 }
28
29 pub fn active_turn_id(&self) -> Option<&str> {
30 match self {
31 Self::ActiveTurn { turn_id, .. } => Some(turn_id),
32 Self::NextTurn => None,
33 }
34 }
35
36 pub fn admits_checkpoint(&self, checkpoint: CheckpointKind) -> bool {
37 match self {
38 Self::ActiveTurn { min_boundary, .. } => min_boundary.admits(checkpoint),
39 Self::NextTurn => false,
40 }
41 }
42}
43
44#[derive(
45 Clone, Copy, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
46)]
47#[serde(rename_all = "snake_case")]
48pub enum TurnInputCheckpointBoundary {
49 #[default]
50 AfterWork,
51 BeforeCompletion,
52}
53
54impl TurnInputCheckpointBoundary {
55 pub fn admits(self, checkpoint: CheckpointKind) -> bool {
56 match self {
57 Self::AfterWork => true,
58 Self::BeforeCompletion => checkpoint == CheckpointKind::BeforeCompletion,
59 }
60 }
61}
62
63#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub enum TurnInputState {
66 PendingActive,
67 DeferredNextTurn,
68 Accepted,
69 Cancelled,
70 Completed,
71}
72
73impl TurnInputState {
74 pub fn as_str(self) -> &'static str {
75 match self {
76 Self::PendingActive => "pending_active",
77 Self::DeferredNextTurn => "deferred_next_turn",
78 Self::Accepted => "accepted",
79 Self::Cancelled => "cancelled",
80 Self::Completed => "completed",
81 }
82 }
83
84 pub fn from_wire_str(value: &str) -> Option<Self> {
85 match value {
86 "pending_active" => Some(Self::PendingActive),
87 "deferred_next_turn" => Some(Self::DeferredNextTurn),
88 "accepted" => Some(Self::Accepted),
89 "cancelled" => Some(Self::Cancelled),
90 "completed" => Some(Self::Completed),
91 _ => None,
92 }
93 }
94
95 pub fn is_next_turn_pending(self) -> bool {
96 matches!(self, Self::DeferredNextTurn)
97 }
98}
99
100#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
101pub struct PendingTurnInputDraft {
102 pub session_id: String,
103 #[serde(default, skip_serializing_if = "Option::is_none")]
104 pub input_id: Option<String>,
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub source_key: Option<String>,
107 pub ingress: TurnInputIngress,
108 pub input: TurnInput,
109}
110
111impl PendingTurnInputDraft {
112 pub fn new(session_id: impl Into<String>, ingress: TurnInputIngress, input: TurnInput) -> Self {
113 Self {
114 session_id: session_id.into(),
115 input_id: None,
116 source_key: None,
117 ingress,
118 input,
119 }
120 }
121
122 pub fn with_input_id(mut self, input_id: impl Into<String>) -> Self {
123 self.input_id = Some(input_id.into());
124 self
125 }
126
127 pub fn with_source_key(mut self, source_key: impl Into<String>) -> Self {
128 self.source_key = Some(source_key.into());
129 self
130 }
131
132 pub fn submitted_content_matches(
133 &self,
134 existing: &PendingTurnInput,
135 ) -> Result<bool, serde_json::Error> {
136 Ok(self.ingress == existing.ingress
137 && serde_json::to_value(&self.input)? == serde_json::to_value(&existing.input)?)
138 }
139}
140
141#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
142pub struct PendingTurnInput {
143 pub input_id: String,
144 pub session_id: String,
145 pub enqueue_seq: u64,
146 #[serde(default, skip_serializing_if = "Option::is_none")]
147 pub source_key: Option<String>,
148 pub ingress: TurnInputIngress,
149 pub state: TurnInputState,
150 pub enqueued_at_ms: u64,
151 pub input: TurnInput,
152}
153
154impl PendingTurnInput {
155 pub fn source_or_id(&self) -> &str {
156 self.source_key.as_deref().unwrap_or(&self.input_id)
157 }
158
159 pub fn accepted_input(&self) -> Option<crate::AcceptedInjectedTurnInput> {
160 plugin_message_from_turn_input(&self.input).map(|message| {
161 crate::AcceptedInjectedTurnInput {
162 id: self
163 .source_key
164 .as_deref()
165 .map(source_key_display_id)
166 .or_else(|| Some(self.input_id.clone())),
167 message,
168 }
169 })
170 }
171}
172
173#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
174#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
175pub enum PendingTurnInputCancelTarget {
176 InputId(String),
177 SourceKey(String),
178}
179
180impl PendingTurnInputCancelTarget {
181 pub fn input_id(input_id: impl Into<String>) -> Self {
182 Self::InputId(input_id.into())
183 }
184
185 pub fn source_key(source_key: impl Into<String>) -> Self {
186 Self::SourceKey(source_key.into())
187 }
188}
189
190#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
191pub struct PendingTurnInputClaimDiagnostics {
192 pub state: TurnInputState,
193 #[serde(default, skip_serializing_if = "Option::is_none")]
194 pub claim_id: Option<String>,
195 #[serde(default, skip_serializing_if = "Option::is_none")]
196 pub claim_owner: Option<crate::LeaseOwnerIdentity>,
197 #[serde(default, skip_serializing_if = "Option::is_none")]
198 pub claim_expires_at_ms: Option<u64>,
199 pub claim_fencing_token: u64,
200}
201
202#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
203#[serde(tag = "outcome", content = "data", rename_all = "snake_case")]
204pub enum PendingTurnInputCancelOutcome {
205 Cancelled(PendingTurnInput),
206 AlreadyClaimed {
207 input: PendingTurnInput,
208 #[serde(default, skip_serializing_if = "Option::is_none")]
209 claim: Option<PendingTurnInputClaimDiagnostics>,
210 },
211 AlreadyCompleted(PendingTurnInput),
212 AlreadyCancelled(PendingTurnInput),
213 NotFound,
214}
215
216impl PendingTurnInputCancelOutcome {
217 pub fn is_cancelled(&self) -> bool {
218 matches!(self, Self::Cancelled(_))
219 }
220
221 pub fn input(&self) -> Option<&PendingTurnInput> {
222 match self {
223 Self::Cancelled(input)
224 | Self::AlreadyClaimed { input, .. }
225 | Self::AlreadyCompleted(input)
226 | Self::AlreadyCancelled(input) => Some(input),
227 Self::NotFound => None,
228 }
229 }
230}
231
232#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
233pub struct PendingTurnInputCancelResult {
234 pub target: PendingTurnInputCancelTarget,
235 pub outcome: PendingTurnInputCancelOutcome,
236}
237
238#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
239#[serde(tag = "outcome", content = "data", rename_all = "snake_case")]
240pub enum PendingTurnInputSuffixCancelOutcome {
241 AnchorNotFound {
242 anchor: PendingTurnInputCancelTarget,
243 },
244 Outcomes {
245 anchor: PendingTurnInputCancelTarget,
246 outcomes: Vec<PendingTurnInputCancelOutcome>,
247 },
248}
249
250#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
251#[serde(tag = "kind", rename_all = "snake_case")]
252pub enum TurnInputClaimMode {
253 ActiveTurn {
254 turn_id: String,
255 checkpoint: CheckpointKind,
256 },
257 NextTurn,
258}
259
260#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
261pub struct TurnInputCompletion {
262 pub session_id: String,
263 pub claim_id: String,
264 pub lease_token: String,
265 pub input_ids: Vec<String>,
266}
267
268#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
269pub struct TurnInputClaim {
270 pub session_id: String,
271 pub claim_id: String,
272 pub owner: crate::LeaseOwnerIdentity,
273 pub lease_token: String,
274 pub fencing_token: u64,
275 pub claimed_at_epoch_ms: u64,
276 pub expires_at_epoch_ms: u64,
277 pub mode: TurnInputClaimMode,
278 pub inputs: Vec<PendingTurnInput>,
279}
280
281impl TurnInputClaim {
282 pub fn completion(&self) -> TurnInputCompletion {
283 TurnInputCompletion {
284 session_id: self.session_id.clone(),
285 claim_id: self.claim_id.clone(),
286 lease_token: self.lease_token.clone(),
287 input_ids: self
288 .inputs
289 .iter()
290 .map(|input| input.input_id.clone())
291 .collect(),
292 }
293 }
294
295 pub fn accepted_turn_inputs(&self) -> Vec<crate::AcceptedInjectedTurnInput> {
296 self.inputs
297 .iter()
298 .filter_map(PendingTurnInput::accepted_input)
299 .collect()
300 }
301
302 pub async fn materialize_for_checkpoint(
303 &self,
304 attachment_store: &dyn crate::AttachmentStore,
305 ) -> Result<QueuedCheckpointTurnInput, String> {
306 let mut transient_messages = Vec::new();
307 for input in &self.inputs {
308 if let Some(message) =
309 plugin_message_from_turn_input_with_attachments(&input.input, attachment_store)
310 .await?
311 {
312 transient_messages.push(message);
313 }
314 }
315 Ok(QueuedCheckpointTurnInput {
316 transient_messages,
317 turn_causes: Vec::new(),
318 })
319 }
320
321 pub fn materialize_for_turn(&self) -> TurnInput {
322 let mut input_items = Vec::new();
323 let mut image_blobs = std::collections::HashMap::new();
324 let mut protocol_turn_options = None;
325 let mut trace_turn_id = None;
326 for pending in &self.inputs {
327 input_items.extend(pending.input.items.clone());
328 image_blobs.extend(pending.input.image_blobs.clone());
329 if protocol_turn_options.is_none() {
330 protocol_turn_options = pending.input.protocol_turn_options.clone();
331 }
332 if trace_turn_id.is_none() {
333 trace_turn_id = pending.input.trace_turn_id.clone();
334 }
335 }
336 TurnInput {
337 items: input_items,
338 image_blobs,
339 protocol_turn_options,
340 trace_turn_id,
341 protocol_extension: None,
342 turn_context: crate::TurnContext::default(),
343 }
344 }
345}
346
347#[derive(Clone, Debug, Default)]
348pub struct QueuedCheckpointTurnInput {
349 pub transient_messages: Vec<PluginMessage>,
350 pub turn_causes: Vec<TurnCause>,
351}
352
353pub(crate) fn source_key_display_id(source: &str) -> String {
354 source
355 .strip_prefix("host:")
356 .or_else(|| source.strip_prefix("injection:"))
357 .unwrap_or(source)
358 .to_string()
359}
360
361pub(crate) fn plugin_message_from_turn_input(input: &TurnInput) -> Option<PluginMessage> {
362 let mut text = Vec::new();
363 let mut images = Vec::new();
364 for item in &input.items {
365 match item {
366 crate::InputItem::Text { text: item_text } if !item_text.is_empty() => {
367 text.push(item_text.clone());
368 }
369 crate::InputItem::Text { .. } => {}
370 crate::InputItem::ImageRef { id } => {
371 if let Some(bytes) = input.image_blobs.get(id).cloned() {
372 images.push(bytes);
373 }
374 }
375 }
376 }
377 if text.is_empty() && images.is_empty() {
378 return None;
379 }
380 Some(PluginMessage {
381 role: crate::MessageRole::User,
382 content: text.join("\n"),
383 origin: None,
384 parts: Vec::new(),
385 images,
386 })
387}
388
389pub(crate) async fn plugin_message_from_turn_input_with_attachments(
390 input: &TurnInput,
391 attachment_store: &dyn crate::AttachmentStore,
392) -> Result<Option<PluginMessage>, String> {
393 let normalized =
394 super::io::normalize_input_items(&input.items, &input.image_blobs, attachment_store)
395 .await?;
396 let has_image = normalized
397 .iter()
398 .any(|item| matches!(item, super::NormalizedItem::Image(_)));
399 if !has_image {
400 return Ok(plugin_message_from_turn_input(input));
401 }
402
403 let mut content = Vec::new();
404 let mut parts = Vec::new();
405 for item in normalized {
406 match item {
407 super::NormalizedItem::Text(text) if !text.is_empty() => {
408 let part_id = format!("pending.p{}", parts.len());
409 content.push(text.clone());
410 parts.push(crate::Part {
411 id: part_id,
412 kind: crate::PartKind::Text,
413 content: text,
414 attachment: None,
415 tool_call_id: None,
416 tool_name: None,
417 tool_replay: None,
418 prune_state: crate::PruneState::Intact,
419 reasoning_meta: None,
420 response_meta: None,
421 });
422 }
423 super::NormalizedItem::Text(_) => {}
424 super::NormalizedItem::Image(reference) => {
425 let part_id = format!("pending.p{}", parts.len());
426 parts.push(crate::Part {
427 id: part_id,
428 kind: crate::PartKind::Image,
429 content: String::new(),
430 attachment: Some(crate::session_model::message::PartAttachment { reference }),
431 tool_call_id: None,
432 tool_name: None,
433 tool_replay: None,
434 prune_state: crate::PruneState::Intact,
435 reasoning_meta: None,
436 response_meta: None,
437 });
438 }
439 }
440 }
441 if parts.is_empty() {
442 return Ok(None);
443 }
444 Ok(Some(PluginMessage {
445 role: crate::MessageRole::User,
446 content: content.join("\n"),
447 origin: None,
448 parts,
449 images: Vec::new(),
450 }))
451}