Skip to main content

lash_core/runtime/
turn_input_ingress.rs

1use crate::{CheckpointKind, PluginMessage, TurnCause, TurnInput};
2
3#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
4#[serde(tag = "scope", rename_all = "snake_case")]
5pub enum TurnInputIngress {
6    ActiveTurn {
7        turn_id: String,
8        #[serde(default)]
9        min_boundary: TurnInputCheckpointBoundary,
10    },
11    NextTurn,
12}
13
14impl TurnInputIngress {
15    pub fn active_turn(
16        turn_id: impl Into<String>,
17        min_boundary: TurnInputCheckpointBoundary,
18    ) -> Self {
19        Self::ActiveTurn {
20            turn_id: turn_id.into(),
21            min_boundary,
22        }
23    }
24
25    pub fn next_turn() -> Self {
26        Self::NextTurn
27    }
28
29    pub fn active_turn_id(&self) -> Option<&str> {
30        match self {
31            Self::ActiveTurn { turn_id, .. } => Some(turn_id),
32            Self::NextTurn => None,
33        }
34    }
35
36    pub fn admits_checkpoint(&self, checkpoint: CheckpointKind) -> bool {
37        match self {
38            Self::ActiveTurn { min_boundary, .. } => min_boundary.admits(checkpoint),
39            Self::NextTurn => false,
40        }
41    }
42}
43
44#[derive(
45    Clone, Copy, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
46)]
47#[serde(rename_all = "snake_case")]
48pub enum TurnInputCheckpointBoundary {
49    #[default]
50    AfterWork,
51    BeforeCompletion,
52}
53
54impl TurnInputCheckpointBoundary {
55    pub fn admits(self, checkpoint: CheckpointKind) -> bool {
56        match self {
57            Self::AfterWork => true,
58            Self::BeforeCompletion => checkpoint == CheckpointKind::BeforeCompletion,
59        }
60    }
61}
62
63#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub enum TurnInputState {
66    PendingActive,
67    DeferredNextTurn,
68    Accepted,
69    Cancelled,
70    Completed,
71}
72
73impl TurnInputState {
74    pub fn as_str(self) -> &'static str {
75        match self {
76            Self::PendingActive => "pending_active",
77            Self::DeferredNextTurn => "deferred_next_turn",
78            Self::Accepted => "accepted",
79            Self::Cancelled => "cancelled",
80            Self::Completed => "completed",
81        }
82    }
83
84    pub fn from_wire_str(value: &str) -> Option<Self> {
85        match value {
86            "pending_active" => Some(Self::PendingActive),
87            "deferred_next_turn" => Some(Self::DeferredNextTurn),
88            "accepted" => Some(Self::Accepted),
89            "cancelled" => Some(Self::Cancelled),
90            "completed" => Some(Self::Completed),
91            _ => None,
92        }
93    }
94
95    pub fn is_next_turn_pending(self) -> bool {
96        matches!(self, Self::DeferredNextTurn)
97    }
98}
99
100#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
101pub struct PendingTurnInputDraft {
102    pub session_id: String,
103    #[serde(default, skip_serializing_if = "Option::is_none")]
104    pub input_id: Option<String>,
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub source_key: Option<String>,
107    pub ingress: TurnInputIngress,
108    pub input: TurnInput,
109}
110
111impl PendingTurnInputDraft {
112    pub fn new(session_id: impl Into<String>, ingress: TurnInputIngress, input: TurnInput) -> Self {
113        Self {
114            session_id: session_id.into(),
115            input_id: None,
116            source_key: None,
117            ingress,
118            input,
119        }
120    }
121
122    pub fn with_input_id(mut self, input_id: impl Into<String>) -> Self {
123        self.input_id = Some(input_id.into());
124        self
125    }
126
127    pub fn with_source_key(mut self, source_key: impl Into<String>) -> Self {
128        self.source_key = Some(source_key.into());
129        self
130    }
131
132    pub fn submitted_content_matches(
133        &self,
134        existing: &PendingTurnInput,
135    ) -> Result<bool, serde_json::Error> {
136        Ok(self.ingress == existing.ingress
137            && serde_json::to_value(&self.input)? == serde_json::to_value(&existing.input)?)
138    }
139}
140
141#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
142pub struct PendingTurnInput {
143    pub input_id: String,
144    pub session_id: String,
145    pub enqueue_seq: u64,
146    #[serde(default, skip_serializing_if = "Option::is_none")]
147    pub source_key: Option<String>,
148    pub ingress: TurnInputIngress,
149    pub state: TurnInputState,
150    pub enqueued_at_ms: u64,
151    pub input: TurnInput,
152}
153
154impl PendingTurnInput {
155    pub fn source_or_id(&self) -> &str {
156        self.source_key.as_deref().unwrap_or(&self.input_id)
157    }
158
159    pub fn accepted_input(&self) -> Option<crate::AcceptedInjectedTurnInput> {
160        plugin_message_from_turn_input(&self.input).map(|message| {
161            crate::AcceptedInjectedTurnInput {
162                id: self
163                    .source_key
164                    .as_deref()
165                    .map(source_key_display_id)
166                    .or_else(|| Some(self.input_id.clone())),
167                message,
168            }
169        })
170    }
171}
172
173#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
174#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
175pub enum PendingTurnInputCancelTarget {
176    InputId(String),
177    SourceKey(String),
178}
179
180impl PendingTurnInputCancelTarget {
181    pub fn input_id(input_id: impl Into<String>) -> Self {
182        Self::InputId(input_id.into())
183    }
184
185    pub fn source_key(source_key: impl Into<String>) -> Self {
186        Self::SourceKey(source_key.into())
187    }
188}
189
190#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
191pub struct PendingTurnInputClaimDiagnostics {
192    pub state: TurnInputState,
193    #[serde(default, skip_serializing_if = "Option::is_none")]
194    pub claim_id: Option<String>,
195    #[serde(default, skip_serializing_if = "Option::is_none")]
196    pub claim_owner: Option<crate::LeaseOwnerIdentity>,
197    #[serde(default, skip_serializing_if = "Option::is_none")]
198    pub claim_expires_at_ms: Option<u64>,
199    pub claim_fencing_token: u64,
200}
201
202#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
203#[serde(tag = "outcome", content = "data", rename_all = "snake_case")]
204pub enum PendingTurnInputCancelOutcome {
205    Cancelled(PendingTurnInput),
206    AlreadyClaimed {
207        input: PendingTurnInput,
208        #[serde(default, skip_serializing_if = "Option::is_none")]
209        claim: Option<PendingTurnInputClaimDiagnostics>,
210    },
211    AlreadyCompleted(PendingTurnInput),
212    AlreadyCancelled(PendingTurnInput),
213    NotFound,
214}
215
216impl PendingTurnInputCancelOutcome {
217    pub fn is_cancelled(&self) -> bool {
218        matches!(self, Self::Cancelled(_))
219    }
220
221    pub fn input(&self) -> Option<&PendingTurnInput> {
222        match self {
223            Self::Cancelled(input)
224            | Self::AlreadyClaimed { input, .. }
225            | Self::AlreadyCompleted(input)
226            | Self::AlreadyCancelled(input) => Some(input),
227            Self::NotFound => None,
228        }
229    }
230}
231
232#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
233pub struct PendingTurnInputCancelResult {
234    pub target: PendingTurnInputCancelTarget,
235    pub outcome: PendingTurnInputCancelOutcome,
236}
237
238#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
239#[serde(tag = "outcome", content = "data", rename_all = "snake_case")]
240pub enum PendingTurnInputSuffixCancelOutcome {
241    AnchorNotFound {
242        anchor: PendingTurnInputCancelTarget,
243    },
244    Outcomes {
245        anchor: PendingTurnInputCancelTarget,
246        outcomes: Vec<PendingTurnInputCancelOutcome>,
247    },
248}
249
250#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
251#[serde(tag = "kind", rename_all = "snake_case")]
252pub enum TurnInputClaimMode {
253    ActiveTurn {
254        turn_id: String,
255        checkpoint: CheckpointKind,
256    },
257    NextTurn,
258}
259
260#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
261pub struct TurnInputCompletion {
262    pub session_id: String,
263    pub claim_id: String,
264    pub lease_token: String,
265    pub input_ids: Vec<String>,
266}
267
268#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
269pub struct TurnInputClaim {
270    pub session_id: String,
271    pub claim_id: String,
272    pub owner: crate::LeaseOwnerIdentity,
273    pub lease_token: String,
274    pub fencing_token: u64,
275    pub claimed_at_epoch_ms: u64,
276    pub expires_at_epoch_ms: u64,
277    pub mode: TurnInputClaimMode,
278    pub inputs: Vec<PendingTurnInput>,
279}
280
281impl TurnInputClaim {
282    pub fn completion(&self) -> TurnInputCompletion {
283        TurnInputCompletion {
284            session_id: self.session_id.clone(),
285            claim_id: self.claim_id.clone(),
286            lease_token: self.lease_token.clone(),
287            input_ids: self
288                .inputs
289                .iter()
290                .map(|input| input.input_id.clone())
291                .collect(),
292        }
293    }
294
295    pub fn accepted_turn_inputs(&self) -> Vec<crate::AcceptedInjectedTurnInput> {
296        self.inputs
297            .iter()
298            .filter_map(PendingTurnInput::accepted_input)
299            .collect()
300    }
301
302    pub async fn materialize_for_checkpoint(
303        &self,
304        attachment_store: &dyn crate::AttachmentStore,
305    ) -> Result<QueuedCheckpointTurnInput, String> {
306        let mut transient_messages = Vec::new();
307        for input in &self.inputs {
308            if let Some(message) =
309                plugin_message_from_turn_input_with_attachments(&input.input, attachment_store)
310                    .await?
311            {
312                transient_messages.push(message);
313            }
314        }
315        Ok(QueuedCheckpointTurnInput {
316            transient_messages,
317            turn_causes: Vec::new(),
318        })
319    }
320
321    pub fn materialize_for_turn(&self) -> TurnInput {
322        let mut input_items = Vec::new();
323        let mut image_blobs = std::collections::HashMap::new();
324        let mut protocol_turn_options = None;
325        let mut trace_turn_id = None;
326        for pending in &self.inputs {
327            input_items.extend(pending.input.items.clone());
328            image_blobs.extend(pending.input.image_blobs.clone());
329            if protocol_turn_options.is_none() {
330                protocol_turn_options = pending.input.protocol_turn_options.clone();
331            }
332            if trace_turn_id.is_none() {
333                trace_turn_id = pending.input.trace_turn_id.clone();
334            }
335        }
336        TurnInput {
337            items: input_items,
338            image_blobs,
339            protocol_turn_options,
340            trace_turn_id,
341            protocol_extension: None,
342            turn_context: crate::TurnContext::default(),
343        }
344    }
345}
346
347#[derive(Clone, Debug, Default)]
348pub struct QueuedCheckpointTurnInput {
349    pub transient_messages: Vec<PluginMessage>,
350    pub turn_causes: Vec<TurnCause>,
351}
352
353pub(crate) fn source_key_display_id(source: &str) -> String {
354    source
355        .strip_prefix("host:")
356        .or_else(|| source.strip_prefix("injection:"))
357        .unwrap_or(source)
358        .to_string()
359}
360
361pub(crate) fn plugin_message_from_turn_input(input: &TurnInput) -> Option<PluginMessage> {
362    let mut text = Vec::new();
363    let mut images = Vec::new();
364    for item in &input.items {
365        match item {
366            crate::InputItem::Text { text: item_text } if !item_text.is_empty() => {
367                text.push(item_text.clone());
368            }
369            crate::InputItem::Text { .. } => {}
370            crate::InputItem::ImageRef { id } => {
371                if let Some(bytes) = input.image_blobs.get(id).cloned() {
372                    images.push(bytes);
373                }
374            }
375        }
376    }
377    if text.is_empty() && images.is_empty() {
378        return None;
379    }
380    Some(PluginMessage {
381        role: crate::MessageRole::User,
382        content: text.join("\n"),
383        origin: None,
384        parts: Vec::new(),
385        images,
386    })
387}
388
389pub(crate) async fn plugin_message_from_turn_input_with_attachments(
390    input: &TurnInput,
391    attachment_store: &dyn crate::AttachmentStore,
392) -> Result<Option<PluginMessage>, String> {
393    let normalized =
394        super::io::normalize_input_items(&input.items, &input.image_blobs, attachment_store)
395            .await?;
396    let has_image = normalized
397        .iter()
398        .any(|item| matches!(item, super::NormalizedItem::Image(_)));
399    if !has_image {
400        return Ok(plugin_message_from_turn_input(input));
401    }
402
403    let mut content = Vec::new();
404    let mut parts = Vec::new();
405    for item in normalized {
406        match item {
407            super::NormalizedItem::Text(text) if !text.is_empty() => {
408                let part_id = format!("pending.p{}", parts.len());
409                content.push(text.clone());
410                parts.push(crate::Part {
411                    id: part_id,
412                    kind: crate::PartKind::Text,
413                    content: text,
414                    attachment: None,
415                    tool_call_id: None,
416                    tool_name: None,
417                    tool_replay: None,
418                    prune_state: crate::PruneState::Intact,
419                    reasoning_meta: None,
420                    response_meta: None,
421                });
422            }
423            super::NormalizedItem::Text(_) => {}
424            super::NormalizedItem::Image(reference) => {
425                let part_id = format!("pending.p{}", parts.len());
426                parts.push(crate::Part {
427                    id: part_id,
428                    kind: crate::PartKind::Image,
429                    content: String::new(),
430                    attachment: Some(crate::session_model::message::PartAttachment { reference }),
431                    tool_call_id: None,
432                    tool_name: None,
433                    tool_replay: None,
434                    prune_state: crate::PruneState::Intact,
435                    reasoning_meta: None,
436                    response_meta: None,
437                });
438            }
439        }
440    }
441    if parts.is_empty() {
442        return Ok(None);
443    }
444    Ok(Some(PluginMessage {
445        role: crate::MessageRole::User,
446        content: content.join("\n"),
447        origin: None,
448        parts,
449        images: Vec::new(),
450    }))
451}