1use crate::{CheckpointKind, PluginMessage, TurnCause, TurnInput};
2
3pub const TURN_INPUT_CLAIM_TTL_MS: u64 = 30 * 1000;
4
5#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
6#[serde(tag = "scope", rename_all = "snake_case")]
7pub enum TurnInputIngress {
8 ActiveTurn {
9 turn_id: String,
10 #[serde(default)]
11 min_boundary: TurnInputCheckpointBoundary,
12 },
13 NextTurn,
14}
15
16impl TurnInputIngress {
17 pub fn active_turn(
18 turn_id: impl Into<String>,
19 min_boundary: TurnInputCheckpointBoundary,
20 ) -> Self {
21 Self::ActiveTurn {
22 turn_id: turn_id.into(),
23 min_boundary,
24 }
25 }
26
27 pub fn next_turn() -> Self {
28 Self::NextTurn
29 }
30
31 pub fn active_turn_id(&self) -> Option<&str> {
32 match self {
33 Self::ActiveTurn { turn_id, .. } => Some(turn_id),
34 Self::NextTurn => None,
35 }
36 }
37
38 pub fn admits_checkpoint(&self, checkpoint: CheckpointKind) -> bool {
39 match self {
40 Self::ActiveTurn { min_boundary, .. } => min_boundary.admits(checkpoint),
41 Self::NextTurn => false,
42 }
43 }
44}
45
46#[derive(
47 Clone, Copy, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
48)]
49#[serde(rename_all = "snake_case")]
50pub enum TurnInputCheckpointBoundary {
51 #[default]
52 AfterWork,
53 BeforeCompletion,
54}
55
56impl TurnInputCheckpointBoundary {
57 pub fn admits(self, checkpoint: CheckpointKind) -> bool {
58 match self {
59 Self::AfterWork => true,
60 Self::BeforeCompletion => checkpoint == CheckpointKind::BeforeCompletion,
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
66#[serde(rename_all = "snake_case")]
67pub enum TurnInputState {
68 PendingActive,
69 DeferredNextTurn,
70 Accepted,
71 Cancelled,
72 Completed,
73}
74
75impl TurnInputState {
76 pub fn as_str(self) -> &'static str {
77 match self {
78 Self::PendingActive => "pending_active",
79 Self::DeferredNextTurn => "deferred_next_turn",
80 Self::Accepted => "accepted",
81 Self::Cancelled => "cancelled",
82 Self::Completed => "completed",
83 }
84 }
85
86 pub fn from_wire_str(value: &str) -> Option<Self> {
87 match value {
88 "pending_active" => Some(Self::PendingActive),
89 "deferred_next_turn" => Some(Self::DeferredNextTurn),
90 "accepted" => Some(Self::Accepted),
91 "cancelled" => Some(Self::Cancelled),
92 "completed" => Some(Self::Completed),
93 _ => None,
94 }
95 }
96
97 pub fn is_next_turn_pending(self) -> bool {
98 matches!(self, Self::DeferredNextTurn)
99 }
100}
101
102#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
103pub struct PendingTurnInputDraft {
104 pub session_id: String,
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub input_id: Option<String>,
107 #[serde(default, skip_serializing_if = "Option::is_none")]
108 pub source_key: Option<String>,
109 pub ingress: TurnInputIngress,
110 pub input: TurnInput,
111}
112
113impl PendingTurnInputDraft {
114 pub fn new(session_id: impl Into<String>, ingress: TurnInputIngress, input: TurnInput) -> Self {
115 Self {
116 session_id: session_id.into(),
117 input_id: None,
118 source_key: None,
119 ingress,
120 input,
121 }
122 }
123
124 pub fn with_input_id(mut self, input_id: impl Into<String>) -> Self {
125 self.input_id = Some(input_id.into());
126 self
127 }
128
129 pub fn with_source_key(mut self, source_key: impl Into<String>) -> Self {
130 self.source_key = Some(source_key.into());
131 self
132 }
133
134 pub fn submitted_content_matches(
135 &self,
136 existing: &PendingTurnInput,
137 ) -> Result<bool, serde_json::Error> {
138 Ok(self.ingress == existing.ingress
139 && serde_json::to_value(&self.input)? == serde_json::to_value(&existing.input)?)
140 }
141}
142
143#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
144pub struct PendingTurnInput {
145 pub input_id: String,
146 pub session_id: String,
147 pub enqueue_seq: u64,
148 #[serde(default, skip_serializing_if = "Option::is_none")]
149 pub source_key: Option<String>,
150 pub ingress: TurnInputIngress,
151 pub state: TurnInputState,
152 pub enqueued_at_ms: u64,
153 pub input: TurnInput,
154}
155
156impl PendingTurnInput {
157 pub fn source_or_id(&self) -> &str {
158 self.source_key.as_deref().unwrap_or(&self.input_id)
159 }
160
161 pub fn accepted_input(&self) -> Option<crate::AcceptedInjectedTurnInput> {
162 plugin_message_from_turn_input(&self.input).map(|message| {
163 crate::AcceptedInjectedTurnInput {
164 id: self
165 .source_key
166 .as_deref()
167 .map(source_key_display_id)
168 .or_else(|| Some(self.input_id.clone())),
169 message,
170 }
171 })
172 }
173}
174
175#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
176#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
177pub enum PendingTurnInputCancelTarget {
178 InputId(String),
179 SourceKey(String),
180}
181
182impl PendingTurnInputCancelTarget {
183 pub fn input_id(input_id: impl Into<String>) -> Self {
184 Self::InputId(input_id.into())
185 }
186
187 pub fn source_key(source_key: impl Into<String>) -> Self {
188 Self::SourceKey(source_key.into())
189 }
190}
191
192#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
193pub struct PendingTurnInputClaimDiagnostics {
194 pub state: TurnInputState,
195 #[serde(default, skip_serializing_if = "Option::is_none")]
196 pub claim_id: Option<String>,
197 #[serde(default, skip_serializing_if = "Option::is_none")]
198 pub claim_owner: Option<crate::LeaseOwnerIdentity>,
199 #[serde(default, skip_serializing_if = "Option::is_none")]
200 pub claim_expires_at_ms: Option<u64>,
201 pub claim_fencing_token: u64,
202}
203
204#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
205#[serde(tag = "outcome", content = "data", rename_all = "snake_case")]
206pub enum PendingTurnInputCancelOutcome {
207 Cancelled(PendingTurnInput),
208 AlreadyClaimed {
209 input: PendingTurnInput,
210 #[serde(default, skip_serializing_if = "Option::is_none")]
211 claim: Option<PendingTurnInputClaimDiagnostics>,
212 },
213 AlreadyCompleted(PendingTurnInput),
214 AlreadyCancelled(PendingTurnInput),
215 NotFound,
216}
217
218impl PendingTurnInputCancelOutcome {
219 pub fn is_cancelled(&self) -> bool {
220 matches!(self, Self::Cancelled(_))
221 }
222
223 pub fn input(&self) -> Option<&PendingTurnInput> {
224 match self {
225 Self::Cancelled(input)
226 | Self::AlreadyClaimed { input, .. }
227 | Self::AlreadyCompleted(input)
228 | Self::AlreadyCancelled(input) => Some(input),
229 Self::NotFound => None,
230 }
231 }
232}
233
234#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
235pub struct PendingTurnInputCancelResult {
236 pub target: PendingTurnInputCancelTarget,
237 pub outcome: PendingTurnInputCancelOutcome,
238}
239
240#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
241#[serde(tag = "outcome", content = "data", rename_all = "snake_case")]
242pub enum PendingTurnInputSuffixCancelOutcome {
243 AnchorNotFound {
244 anchor: PendingTurnInputCancelTarget,
245 },
246 Outcomes {
247 anchor: PendingTurnInputCancelTarget,
248 outcomes: Vec<PendingTurnInputCancelOutcome>,
249 },
250}
251
252#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
253#[serde(tag = "kind", rename_all = "snake_case")]
254pub enum TurnInputClaimMode {
255 ActiveTurn {
256 turn_id: String,
257 checkpoint: CheckpointKind,
258 },
259 NextTurn,
260}
261
262#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
263pub struct TurnInputCompletion {
264 pub session_id: String,
265 pub claim_id: String,
266 pub lease_token: String,
267 pub input_ids: Vec<String>,
268}
269
270#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
271pub struct TurnInputClaim {
272 pub session_id: String,
273 pub claim_id: String,
274 pub owner: crate::LeaseOwnerIdentity,
275 pub lease_token: String,
276 pub fencing_token: u64,
277 pub claimed_at_epoch_ms: u64,
278 pub expires_at_epoch_ms: u64,
279 pub mode: TurnInputClaimMode,
280 pub inputs: Vec<PendingTurnInput>,
281}
282
283impl TurnInputClaim {
284 pub fn completion(&self) -> TurnInputCompletion {
285 TurnInputCompletion {
286 session_id: self.session_id.clone(),
287 claim_id: self.claim_id.clone(),
288 lease_token: self.lease_token.clone(),
289 input_ids: self
290 .inputs
291 .iter()
292 .map(|input| input.input_id.clone())
293 .collect(),
294 }
295 }
296
297 pub fn accepted_turn_inputs(&self) -> Vec<crate::AcceptedInjectedTurnInput> {
298 self.inputs
299 .iter()
300 .filter_map(PendingTurnInput::accepted_input)
301 .collect()
302 }
303
304 pub async fn materialize_for_checkpoint(
305 &self,
306 attachment_store: &dyn crate::AttachmentStore,
307 ) -> Result<QueuedCheckpointTurnInput, String> {
308 let mut transient_messages = Vec::new();
309 for input in &self.inputs {
310 if let Some(message) =
311 plugin_message_from_turn_input_with_attachments(&input.input, attachment_store)
312 .await?
313 {
314 transient_messages.push(message);
315 }
316 }
317 Ok(QueuedCheckpointTurnInput {
318 transient_messages,
319 turn_causes: Vec::new(),
320 })
321 }
322
323 pub fn materialize_for_turn(&self) -> TurnInput {
324 let mut input_items = Vec::new();
325 let mut image_blobs = std::collections::HashMap::new();
326 let mut protocol_turn_options = None;
327 let mut trace_turn_id = None;
328 for pending in &self.inputs {
329 input_items.extend(pending.input.items.clone());
330 image_blobs.extend(pending.input.image_blobs.clone());
331 if protocol_turn_options.is_none() {
332 protocol_turn_options = pending.input.protocol_turn_options.clone();
333 }
334 if trace_turn_id.is_none() {
335 trace_turn_id = pending.input.trace_turn_id.clone();
336 }
337 }
338 TurnInput {
339 items: input_items,
340 image_blobs,
341 protocol_turn_options,
342 trace_turn_id,
343 protocol_extension: None,
344 turn_context: crate::TurnContext::default(),
345 }
346 }
347}
348
349#[derive(Clone, Debug, Default)]
350pub struct QueuedCheckpointTurnInput {
351 pub transient_messages: Vec<PluginMessage>,
352 pub turn_causes: Vec<TurnCause>,
353}
354
355pub(crate) fn source_key_display_id(source: &str) -> String {
356 source
357 .strip_prefix("host:")
358 .or_else(|| source.strip_prefix("injection:"))
359 .unwrap_or(source)
360 .to_string()
361}
362
363pub(crate) fn plugin_message_from_turn_input(input: &TurnInput) -> Option<PluginMessage> {
364 let mut text = Vec::new();
365 let mut images = Vec::new();
366 for item in &input.items {
367 match item {
368 crate::InputItem::Text { text: item_text } if !item_text.is_empty() => {
369 text.push(item_text.clone());
370 }
371 crate::InputItem::Text { .. } => {}
372 crate::InputItem::ImageRef { id } => {
373 if let Some(bytes) = input.image_blobs.get(id).cloned() {
374 images.push(bytes);
375 }
376 }
377 }
378 }
379 if text.is_empty() && images.is_empty() {
380 return None;
381 }
382 Some(PluginMessage {
383 role: crate::MessageRole::User,
384 content: text.join("\n"),
385 origin: None,
386 parts: Vec::new(),
387 images,
388 })
389}
390
391pub(crate) async fn plugin_message_from_turn_input_with_attachments(
392 input: &TurnInput,
393 attachment_store: &dyn crate::AttachmentStore,
394) -> Result<Option<PluginMessage>, String> {
395 let normalized =
396 super::io::normalize_input_items(&input.items, &input.image_blobs, attachment_store)
397 .await?;
398 let has_image = normalized
399 .iter()
400 .any(|item| matches!(item, super::NormalizedItem::Image(_)));
401 if !has_image {
402 return Ok(plugin_message_from_turn_input(input));
403 }
404
405 let mut content = Vec::new();
406 let mut parts = Vec::new();
407 for item in normalized {
408 match item {
409 super::NormalizedItem::Text(text) if !text.is_empty() => {
410 let part_id = format!("pending.p{}", parts.len());
411 content.push(text.clone());
412 parts.push(crate::Part {
413 id: part_id,
414 kind: crate::PartKind::Text,
415 content: text,
416 attachment: None,
417 tool_call_id: None,
418 tool_name: None,
419 tool_replay: None,
420 prune_state: crate::PruneState::Intact,
421 reasoning_meta: None,
422 response_meta: None,
423 });
424 }
425 super::NormalizedItem::Text(_) => {}
426 super::NormalizedItem::Image(reference) => {
427 let part_id = format!("pending.p{}", parts.len());
428 parts.push(crate::Part {
429 id: part_id,
430 kind: crate::PartKind::Image,
431 content: String::new(),
432 attachment: Some(crate::session_model::message::PartAttachment { reference }),
433 tool_call_id: None,
434 tool_name: None,
435 tool_replay: None,
436 prune_state: crate::PruneState::Intact,
437 reasoning_meta: None,
438 response_meta: None,
439 });
440 }
441 }
442 }
443 if parts.is_empty() {
444 return Ok(None);
445 }
446 Ok(Some(PluginMessage {
447 role: crate::MessageRole::User,
448 content: content.join("\n"),
449 origin: None,
450 parts,
451 images: Vec::new(),
452 }))
453}