Skip to main content

lash_core/runtime/
turn_input_ingress.rs

1use crate::{CheckpointKind, PluginMessage, TurnCause, TurnInput};
2
3pub const TURN_INPUT_CLAIM_TTL_MS: u64 = 30 * 1000;
4
5#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
6#[serde(tag = "scope", rename_all = "snake_case")]
7pub enum TurnInputIngress {
8    ActiveTurn {
9        turn_id: String,
10        #[serde(default)]
11        min_boundary: TurnInputCheckpointBoundary,
12    },
13    NextTurn,
14}
15
16impl TurnInputIngress {
17    pub fn active_turn(
18        turn_id: impl Into<String>,
19        min_boundary: TurnInputCheckpointBoundary,
20    ) -> Self {
21        Self::ActiveTurn {
22            turn_id: turn_id.into(),
23            min_boundary,
24        }
25    }
26
27    pub fn next_turn() -> Self {
28        Self::NextTurn
29    }
30
31    pub fn active_turn_id(&self) -> Option<&str> {
32        match self {
33            Self::ActiveTurn { turn_id, .. } => Some(turn_id),
34            Self::NextTurn => None,
35        }
36    }
37
38    pub fn admits_checkpoint(&self, checkpoint: CheckpointKind) -> bool {
39        match self {
40            Self::ActiveTurn { min_boundary, .. } => min_boundary.admits(checkpoint),
41            Self::NextTurn => false,
42        }
43    }
44}
45
46#[derive(
47    Clone, Copy, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
48)]
49#[serde(rename_all = "snake_case")]
50pub enum TurnInputCheckpointBoundary {
51    #[default]
52    AfterWork,
53    BeforeCompletion,
54}
55
56impl TurnInputCheckpointBoundary {
57    pub fn admits(self, checkpoint: CheckpointKind) -> bool {
58        match self {
59            Self::AfterWork => true,
60            Self::BeforeCompletion => checkpoint == CheckpointKind::BeforeCompletion,
61        }
62    }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
66#[serde(rename_all = "snake_case")]
67pub enum TurnInputState {
68    PendingActive,
69    DeferredNextTurn,
70    Accepted,
71    Cancelled,
72    Completed,
73}
74
75impl TurnInputState {
76    pub fn as_str(self) -> &'static str {
77        match self {
78            Self::PendingActive => "pending_active",
79            Self::DeferredNextTurn => "deferred_next_turn",
80            Self::Accepted => "accepted",
81            Self::Cancelled => "cancelled",
82            Self::Completed => "completed",
83        }
84    }
85
86    pub fn from_wire_str(value: &str) -> Option<Self> {
87        match value {
88            "pending_active" => Some(Self::PendingActive),
89            "deferred_next_turn" => Some(Self::DeferredNextTurn),
90            "accepted" => Some(Self::Accepted),
91            "cancelled" => Some(Self::Cancelled),
92            "completed" => Some(Self::Completed),
93            _ => None,
94        }
95    }
96
97    pub fn is_next_turn_pending(self) -> bool {
98        matches!(self, Self::DeferredNextTurn)
99    }
100}
101
102#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
103pub struct PendingTurnInputDraft {
104    pub session_id: String,
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub input_id: Option<String>,
107    #[serde(default, skip_serializing_if = "Option::is_none")]
108    pub source_key: Option<String>,
109    pub ingress: TurnInputIngress,
110    pub input: TurnInput,
111}
112
113impl PendingTurnInputDraft {
114    pub fn new(session_id: impl Into<String>, ingress: TurnInputIngress, input: TurnInput) -> Self {
115        Self {
116            session_id: session_id.into(),
117            input_id: None,
118            source_key: None,
119            ingress,
120            input,
121        }
122    }
123
124    pub fn with_input_id(mut self, input_id: impl Into<String>) -> Self {
125        self.input_id = Some(input_id.into());
126        self
127    }
128
129    pub fn with_source_key(mut self, source_key: impl Into<String>) -> Self {
130        self.source_key = Some(source_key.into());
131        self
132    }
133
134    pub fn submitted_content_matches(
135        &self,
136        existing: &PendingTurnInput,
137    ) -> Result<bool, serde_json::Error> {
138        Ok(self.ingress == existing.ingress
139            && serde_json::to_value(&self.input)? == serde_json::to_value(&existing.input)?)
140    }
141}
142
143#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
144pub struct PendingTurnInput {
145    pub input_id: String,
146    pub session_id: String,
147    pub enqueue_seq: u64,
148    #[serde(default, skip_serializing_if = "Option::is_none")]
149    pub source_key: Option<String>,
150    pub ingress: TurnInputIngress,
151    pub state: TurnInputState,
152    pub enqueued_at_ms: u64,
153    pub input: TurnInput,
154}
155
156impl PendingTurnInput {
157    pub fn source_or_id(&self) -> &str {
158        self.source_key.as_deref().unwrap_or(&self.input_id)
159    }
160
161    pub fn accepted_input(&self) -> Option<crate::AcceptedInjectedTurnInput> {
162        plugin_message_from_turn_input(&self.input).map(|message| {
163            crate::AcceptedInjectedTurnInput {
164                id: self
165                    .source_key
166                    .as_deref()
167                    .map(source_key_display_id)
168                    .or_else(|| Some(self.input_id.clone())),
169                message,
170            }
171        })
172    }
173}
174
175#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
176#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
177pub enum PendingTurnInputCancelTarget {
178    InputId(String),
179    SourceKey(String),
180}
181
182impl PendingTurnInputCancelTarget {
183    pub fn input_id(input_id: impl Into<String>) -> Self {
184        Self::InputId(input_id.into())
185    }
186
187    pub fn source_key(source_key: impl Into<String>) -> Self {
188        Self::SourceKey(source_key.into())
189    }
190}
191
192#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
193pub struct PendingTurnInputClaimDiagnostics {
194    pub state: TurnInputState,
195    #[serde(default, skip_serializing_if = "Option::is_none")]
196    pub claim_id: Option<String>,
197    #[serde(default, skip_serializing_if = "Option::is_none")]
198    pub claim_owner: Option<crate::LeaseOwnerIdentity>,
199    #[serde(default, skip_serializing_if = "Option::is_none")]
200    pub claim_expires_at_ms: Option<u64>,
201    pub claim_fencing_token: u64,
202}
203
204#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
205#[serde(tag = "outcome", content = "data", rename_all = "snake_case")]
206pub enum PendingTurnInputCancelOutcome {
207    Cancelled(PendingTurnInput),
208    AlreadyClaimed {
209        input: PendingTurnInput,
210        #[serde(default, skip_serializing_if = "Option::is_none")]
211        claim: Option<PendingTurnInputClaimDiagnostics>,
212    },
213    AlreadyCompleted(PendingTurnInput),
214    AlreadyCancelled(PendingTurnInput),
215    NotFound,
216}
217
218impl PendingTurnInputCancelOutcome {
219    pub fn is_cancelled(&self) -> bool {
220        matches!(self, Self::Cancelled(_))
221    }
222
223    pub fn input(&self) -> Option<&PendingTurnInput> {
224        match self {
225            Self::Cancelled(input)
226            | Self::AlreadyClaimed { input, .. }
227            | Self::AlreadyCompleted(input)
228            | Self::AlreadyCancelled(input) => Some(input),
229            Self::NotFound => None,
230        }
231    }
232}
233
234#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
235pub struct PendingTurnInputCancelResult {
236    pub target: PendingTurnInputCancelTarget,
237    pub outcome: PendingTurnInputCancelOutcome,
238}
239
240#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
241#[serde(tag = "outcome", content = "data", rename_all = "snake_case")]
242pub enum PendingTurnInputSuffixCancelOutcome {
243    AnchorNotFound {
244        anchor: PendingTurnInputCancelTarget,
245    },
246    Outcomes {
247        anchor: PendingTurnInputCancelTarget,
248        outcomes: Vec<PendingTurnInputCancelOutcome>,
249    },
250}
251
252#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
253#[serde(tag = "kind", rename_all = "snake_case")]
254pub enum TurnInputClaimMode {
255    ActiveTurn {
256        turn_id: String,
257        checkpoint: CheckpointKind,
258    },
259    NextTurn,
260}
261
262#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
263pub struct TurnInputCompletion {
264    pub session_id: String,
265    pub claim_id: String,
266    pub lease_token: String,
267    pub input_ids: Vec<String>,
268}
269
270#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
271pub struct TurnInputClaim {
272    pub session_id: String,
273    pub claim_id: String,
274    pub owner: crate::LeaseOwnerIdentity,
275    pub lease_token: String,
276    pub fencing_token: u64,
277    pub claimed_at_epoch_ms: u64,
278    pub expires_at_epoch_ms: u64,
279    pub mode: TurnInputClaimMode,
280    pub inputs: Vec<PendingTurnInput>,
281}
282
283impl TurnInputClaim {
284    pub fn completion(&self) -> TurnInputCompletion {
285        TurnInputCompletion {
286            session_id: self.session_id.clone(),
287            claim_id: self.claim_id.clone(),
288            lease_token: self.lease_token.clone(),
289            input_ids: self
290                .inputs
291                .iter()
292                .map(|input| input.input_id.clone())
293                .collect(),
294        }
295    }
296
297    pub fn accepted_turn_inputs(&self) -> Vec<crate::AcceptedInjectedTurnInput> {
298        self.inputs
299            .iter()
300            .filter_map(PendingTurnInput::accepted_input)
301            .collect()
302    }
303
304    pub async fn materialize_for_checkpoint(
305        &self,
306        attachment_store: &dyn crate::AttachmentStore,
307    ) -> Result<QueuedCheckpointTurnInput, String> {
308        let mut transient_messages = Vec::new();
309        for input in &self.inputs {
310            if let Some(message) =
311                plugin_message_from_turn_input_with_attachments(&input.input, attachment_store)
312                    .await?
313            {
314                transient_messages.push(message);
315            }
316        }
317        Ok(QueuedCheckpointTurnInput {
318            transient_messages,
319            turn_causes: Vec::new(),
320        })
321    }
322
323    pub fn materialize_for_turn(&self) -> TurnInput {
324        let mut input_items = Vec::new();
325        let mut image_blobs = std::collections::HashMap::new();
326        let mut protocol_turn_options = None;
327        let mut trace_turn_id = None;
328        for pending in &self.inputs {
329            input_items.extend(pending.input.items.clone());
330            image_blobs.extend(pending.input.image_blobs.clone());
331            if protocol_turn_options.is_none() {
332                protocol_turn_options = pending.input.protocol_turn_options.clone();
333            }
334            if trace_turn_id.is_none() {
335                trace_turn_id = pending.input.trace_turn_id.clone();
336            }
337        }
338        TurnInput {
339            items: input_items,
340            image_blobs,
341            protocol_turn_options,
342            trace_turn_id,
343            protocol_extension: None,
344            turn_context: crate::TurnContext::default(),
345        }
346    }
347}
348
349#[derive(Clone, Debug, Default)]
350pub struct QueuedCheckpointTurnInput {
351    pub transient_messages: Vec<PluginMessage>,
352    pub turn_causes: Vec<TurnCause>,
353}
354
355pub(crate) fn source_key_display_id(source: &str) -> String {
356    source
357        .strip_prefix("host:")
358        .or_else(|| source.strip_prefix("injection:"))
359        .unwrap_or(source)
360        .to_string()
361}
362
363pub(crate) fn plugin_message_from_turn_input(input: &TurnInput) -> Option<PluginMessage> {
364    let mut text = Vec::new();
365    let mut images = Vec::new();
366    for item in &input.items {
367        match item {
368            crate::InputItem::Text { text: item_text } if !item_text.is_empty() => {
369                text.push(item_text.clone());
370            }
371            crate::InputItem::Text { .. } => {}
372            crate::InputItem::ImageRef { id } => {
373                if let Some(bytes) = input.image_blobs.get(id).cloned() {
374                    images.push(bytes);
375                }
376            }
377        }
378    }
379    if text.is_empty() && images.is_empty() {
380        return None;
381    }
382    Some(PluginMessage {
383        role: crate::MessageRole::User,
384        content: text.join("\n"),
385        origin: None,
386        parts: Vec::new(),
387        images,
388    })
389}
390
391pub(crate) async fn plugin_message_from_turn_input_with_attachments(
392    input: &TurnInput,
393    attachment_store: &dyn crate::AttachmentStore,
394) -> Result<Option<PluginMessage>, String> {
395    let normalized =
396        super::io::normalize_input_items(&input.items, &input.image_blobs, attachment_store)
397            .await?;
398    let has_image = normalized
399        .iter()
400        .any(|item| matches!(item, super::NormalizedItem::Image(_)));
401    if !has_image {
402        return Ok(plugin_message_from_turn_input(input));
403    }
404
405    let mut content = Vec::new();
406    let mut parts = Vec::new();
407    for item in normalized {
408        match item {
409            super::NormalizedItem::Text(text) if !text.is_empty() => {
410                let part_id = format!("pending.p{}", parts.len());
411                content.push(text.clone());
412                parts.push(crate::Part {
413                    id: part_id,
414                    kind: crate::PartKind::Text,
415                    content: text,
416                    attachment: None,
417                    tool_call_id: None,
418                    tool_name: None,
419                    tool_replay: None,
420                    prune_state: crate::PruneState::Intact,
421                    reasoning_meta: None,
422                    response_meta: None,
423                });
424            }
425            super::NormalizedItem::Text(_) => {}
426            super::NormalizedItem::Image(reference) => {
427                let part_id = format!("pending.p{}", parts.len());
428                parts.push(crate::Part {
429                    id: part_id,
430                    kind: crate::PartKind::Image,
431                    content: String::new(),
432                    attachment: Some(crate::session_model::message::PartAttachment { reference }),
433                    tool_call_id: None,
434                    tool_name: None,
435                    tool_replay: None,
436                    prune_state: crate::PruneState::Intact,
437                    reasoning_meta: None,
438                    response_meta: None,
439                });
440            }
441        }
442    }
443    if parts.is_empty() {
444        return Ok(None);
445    }
446    Ok(Some(PluginMessage {
447        role: crate::MessageRole::User,
448        content: content.join("\n"),
449        origin: None,
450        parts,
451        images: Vec::new(),
452    }))
453}