Skip to main content

lash_core/runtime/
turn_queue.rs

1use super::process::ProcessWakeDelivery;
2use crate::{PluginMessage, TurnCause, TurnInput};
3
4pub const QUEUED_WORK_CLAIM_TTL_MS: u64 = 15 * 60 * 1000;
5
6#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
7#[serde(tag = "kind", rename_all = "snake_case")]
8pub enum SessionCommand {
9    // No generation guard: the command drains asynchronously, so any
10    // generation observed at enqueue time may legitimately have advanced by
11    // drain time, and the refresh recomputes the surface from live sources
12    // regardless — a guard could only fail spuriously.
13    RefreshToolSurface { reason: String },
14    ResetSession { reason: String },
15}
16
17impl SessionCommand {
18    pub fn kind(&self) -> &'static str {
19        match self {
20            Self::RefreshToolSurface { .. } => "refresh_tool_surface",
21            Self::ResetSession { .. } => "reset_session",
22        }
23    }
24
25    pub fn source_key(&self, idempotency_key: impl AsRef<str>) -> String {
26        format!("command:{}:{}", self.kind(), idempotency_key.as_ref())
27    }
28}
29
30#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
31pub struct SessionCommandReceipt {
32    pub session_id: String,
33    pub batch_id: String,
34    pub source_key: String,
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
38#[serde(rename_all = "snake_case")]
39pub enum DeliveryPolicy {
40    EarliestSafeBoundary,
41    AfterCurrentTurnCommit,
42}
43
44impl DeliveryPolicy {
45    pub fn as_str(self) -> &'static str {
46        match self {
47            Self::EarliestSafeBoundary => "earliest_safe_boundary",
48            Self::AfterCurrentTurnCommit => "after_current_turn_commit",
49        }
50    }
51
52    pub fn from_wire_str(value: &str) -> Option<Self> {
53        match value {
54            "earliest_safe_boundary" => Some(Self::EarliestSafeBoundary),
55            "after_current_turn_commit" => Some(Self::AfterCurrentTurnCommit),
56            _ => None,
57        }
58    }
59}
60
61#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum SlotPolicy {
64    Join,
65    Exclusive,
66}
67
68impl SlotPolicy {
69    pub fn as_str(self) -> &'static str {
70        match self {
71            Self::Join => "join",
72            Self::Exclusive => "exclusive",
73        }
74    }
75
76    pub fn from_wire_str(value: &str) -> Option<Self> {
77        match value {
78            "join" => Some(Self::Join),
79            "exclusive" => Some(Self::Exclusive),
80            _ => None,
81        }
82    }
83}
84
85#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum MergeKey {
88    Never,
89    PayloadDefault,
90    Group(String),
91}
92
93#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum QueuedWorkPayload {
96    TurnInput { input: Box<TurnInput> },
97    ProcessWake { wake: Box<ProcessWakeDelivery> },
98    SessionCommand { command: Box<SessionCommand> },
99}
100
101impl QueuedWorkPayload {
102    pub fn turn_input(input: TurnInput) -> Self {
103        Self::TurnInput {
104            input: Box::new(input),
105        }
106    }
107
108    pub fn process_wake(wake: ProcessWakeDelivery) -> Self {
109        Self::ProcessWake {
110            wake: Box::new(wake),
111        }
112    }
113
114    pub fn session_command(command: SessionCommand) -> Self {
115        Self::SessionCommand {
116            command: Box::new(command),
117        }
118    }
119}
120
121#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
122pub struct QueuedWorkItem {
123    pub item_id: String,
124    pub payload: QueuedWorkPayload,
125}
126
127#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
128pub struct QueuedWorkBatch {
129    pub batch_id: String,
130    pub session_id: String,
131    pub enqueue_seq: u64,
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    pub source_key: Option<String>,
134    pub delivery_policy: DeliveryPolicy,
135    pub slot_policy: SlotPolicy,
136    pub merge_key: MergeKey,
137    pub available_at_ms: u64,
138    pub enqueued_at_ms: u64,
139    pub items: Vec<QueuedWorkItem>,
140}
141
142#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
143pub struct QueuedWorkBatchDraft {
144    pub session_id: String,
145    #[serde(default, skip_serializing_if = "Option::is_none")]
146    pub source_key: Option<String>,
147    pub delivery_policy: DeliveryPolicy,
148    pub slot_policy: SlotPolicy,
149    pub merge_key: MergeKey,
150    pub available_at_ms: u64,
151    pub payloads: Vec<QueuedWorkPayload>,
152}
153
154impl QueuedWorkBatchDraft {
155    pub fn new(
156        session_id: impl Into<String>,
157        delivery_policy: DeliveryPolicy,
158        slot_policy: SlotPolicy,
159        payloads: impl Into<Vec<QueuedWorkPayload>>,
160    ) -> Self {
161        Self {
162            session_id: session_id.into(),
163            source_key: None,
164            delivery_policy,
165            slot_policy,
166            merge_key: MergeKey::Never,
167            available_at_ms: 0,
168            payloads: payloads.into(),
169        }
170    }
171
172    pub fn with_source_key(mut self, source_key: impl Into<String>) -> Self {
173        self.source_key = Some(source_key.into());
174        self
175    }
176
177    pub fn with_available_at_ms(mut self, available_at_ms: u64) -> Self {
178        self.available_at_ms = available_at_ms;
179        self
180    }
181
182    pub fn with_merge_key(mut self, merge_key: MergeKey) -> Self {
183        self.merge_key = merge_key;
184        self
185    }
186}
187
188#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
189#[serde(rename_all = "snake_case")]
190pub enum QueuedWorkClaimBoundary {
191    ActiveTurnCheckpoint,
192    Idle,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
196pub struct QueuedWorkCompletion {
197    pub session_id: String,
198    pub claim_id: String,
199    pub lease_token: String,
200    pub batch_ids: Vec<String>,
201}
202
203#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
204pub struct QueuedWorkClaim {
205    pub session_id: String,
206    pub claim_id: String,
207    pub owner_id: String,
208    pub lease_token: String,
209    pub fencing_token: u64,
210    pub claimed_at_epoch_ms: u64,
211    pub expires_at_epoch_ms: u64,
212    pub batches: Vec<QueuedWorkBatch>,
213}
214
215impl QueuedWorkClaim {
216    pub fn completion(&self) -> QueuedWorkCompletion {
217        QueuedWorkCompletion {
218            session_id: self.session_id.clone(),
219            claim_id: self.claim_id.clone(),
220            lease_token: self.lease_token.clone(),
221            batch_ids: self
222                .batches
223                .iter()
224                .map(|batch| batch.batch_id.clone())
225                .collect(),
226        }
227    }
228
229    pub fn is_empty(&self) -> bool {
230        self.batches.iter().all(|batch| batch.items.is_empty())
231    }
232
233    pub fn materialize_for_checkpoint(&self) -> QueuedCheckpointWork {
234        let messages = Vec::new();
235        let mut transient_messages = Vec::new();
236        let mut turn_causes = Vec::new();
237        for batch in &self.batches {
238            for item in &batch.items {
239                match &item.payload {
240                    QueuedWorkPayload::TurnInput { input } => {
241                        if let Some(message) = plugin_message_from_turn_input(input) {
242                            transient_messages.push(message);
243                        }
244                    }
245                    QueuedWorkPayload::ProcessWake { wake } => {
246                        turn_causes.push(crate::process_wake_turn_cause(wake));
247                    }
248                    QueuedWorkPayload::SessionCommand { .. } => {}
249                }
250            }
251        }
252        QueuedCheckpointWork {
253            messages,
254            transient_messages,
255            turn_causes,
256        }
257    }
258
259    pub async fn materialize_for_checkpoint_with_attachments(
260        &self,
261        attachment_store: &dyn crate::AttachmentStore,
262    ) -> Result<QueuedCheckpointWork, String> {
263        let messages = Vec::new();
264        let mut transient_messages = Vec::new();
265        let mut turn_causes = Vec::new();
266        for batch in &self.batches {
267            for item in &batch.items {
268                match &item.payload {
269                    QueuedWorkPayload::TurnInput { input } => {
270                        if let Some(message) =
271                            plugin_message_from_turn_input_with_attachments(input, attachment_store)
272                                .await?
273                        {
274                            transient_messages.push(message);
275                        }
276                    }
277                    QueuedWorkPayload::ProcessWake { wake } => {
278                        turn_causes.push(crate::process_wake_turn_cause(wake));
279                    }
280                    QueuedWorkPayload::SessionCommand { .. } => {}
281                }
282            }
283        }
284        Ok(QueuedCheckpointWork {
285            messages,
286            transient_messages,
287            turn_causes,
288        })
289    }
290
291    pub fn accepted_turn_inputs(&self) -> Vec<crate::AcceptedInjectedTurnInput> {
292        let mut accepted = Vec::new();
293        for batch in &self.batches {
294            let id = batch.source_key.as_deref().map(|source| {
295                source
296                    .strip_prefix("host:")
297                    .or_else(|| source.strip_prefix("injection:"))
298                    .unwrap_or(source)
299                    .to_string()
300            });
301            for item in &batch.items {
302                if let QueuedWorkPayload::TurnInput { input } = &item.payload
303                    && let Some(message) = plugin_message_from_turn_input(input)
304                {
305                    accepted.push(crate::AcceptedInjectedTurnInput {
306                        id: id.clone(),
307                        message,
308                    });
309                }
310            }
311        }
312        accepted
313    }
314
315    pub fn exclusive_session_command(&self) -> Option<(&QueuedWorkBatch, &SessionCommand)> {
316        if self.batches.len() != 1 {
317            return None;
318        }
319        let batch = self.batches.first()?;
320        if batch.slot_policy != SlotPolicy::Exclusive || batch.items.len() != 1 {
321            return None;
322        }
323        let item = batch.items.first()?;
324        match &item.payload {
325            QueuedWorkPayload::SessionCommand { command } => Some((batch, command.as_ref())),
326            _ => None,
327        }
328    }
329
330    pub fn materialize_for_turn(&self) -> QueuedTurnWork {
331        let checkpoint = self.materialize_for_checkpoint();
332        let mut input_items = Vec::new();
333        let mut image_blobs = std::collections::HashMap::new();
334        let mut protocol_turn_options = None;
335        let mut trace_turn_id = None;
336        for batch in &self.batches {
337            for item in &batch.items {
338                if let QueuedWorkPayload::TurnInput { input } = &item.payload {
339                    input_items.extend(input.items.clone());
340                    image_blobs.extend(input.image_blobs.clone());
341                    if protocol_turn_options.is_none() {
342                        protocol_turn_options = input.protocol_turn_options.clone();
343                    }
344                    if trace_turn_id.is_none() {
345                        trace_turn_id = input.trace_turn_id.clone();
346                    }
347                }
348            }
349        }
350        QueuedTurnWork {
351            input: TurnInput {
352                items: input_items,
353                image_blobs,
354                protocol_turn_options,
355                trace_turn_id,
356                protocol_extension: None,
357                turn_context: crate::TurnContext::default(),
358            },
359            messages: checkpoint.messages,
360            turn_causes: checkpoint.turn_causes,
361        }
362    }
363}
364
365#[derive(Clone, Debug, Default)]
366pub struct QueuedCheckpointWork {
367    pub messages: Vec<PluginMessage>,
368    pub transient_messages: Vec<PluginMessage>,
369    pub turn_causes: Vec<TurnCause>,
370}
371
372#[derive(Clone, Debug)]
373pub struct QueuedTurnWork {
374    pub input: TurnInput,
375    pub messages: Vec<PluginMessage>,
376    pub turn_causes: Vec<TurnCause>,
377}
378
379pub fn process_wake_batch_draft(wake: ProcessWakeDelivery) -> QueuedWorkBatchDraft {
380    let source_key = format!("process:{}:event:{}:wake", wake.process_id, wake.sequence);
381    QueuedWorkBatchDraft::new(
382        wake.target_session_id.clone(),
383        DeliveryPolicy::EarliestSafeBoundary,
384        SlotPolicy::Exclusive,
385        vec![QueuedWorkPayload::process_wake(wake)],
386    )
387    .with_source_key(source_key)
388}
389
390fn plugin_message_from_turn_input(input: &TurnInput) -> Option<PluginMessage> {
391    let mut text = Vec::new();
392    let mut images = Vec::new();
393    for item in &input.items {
394        match item {
395            crate::InputItem::Text { text: item_text } if !item_text.is_empty() => {
396                text.push(item_text.clone());
397            }
398            crate::InputItem::Text { .. } => {}
399            crate::InputItem::ImageRef { id } => {
400                if let Some(bytes) = input.image_blobs.get(id).cloned() {
401                    images.push(bytes);
402                }
403            }
404        }
405    }
406    if text.is_empty() && images.is_empty() {
407        return None;
408    }
409    Some(PluginMessage {
410        role: crate::MessageRole::User,
411        content: text.join("\n"),
412        origin: None,
413        parts: Vec::new(),
414        images,
415    })
416}
417
418async fn plugin_message_from_turn_input_with_attachments(
419    input: &TurnInput,
420    attachment_store: &dyn crate::AttachmentStore,
421) -> Result<Option<PluginMessage>, String> {
422    let normalized =
423        super::io::normalize_input_items(&input.items, &input.image_blobs, attachment_store)
424            .await?;
425    let has_image = normalized
426        .iter()
427        .any(|item| matches!(item, super::NormalizedItem::Image(_)));
428    if !has_image {
429        return Ok(plugin_message_from_turn_input(input));
430    }
431
432    let mut content = Vec::new();
433    let mut parts = Vec::new();
434    for item in normalized {
435        match item {
436            super::NormalizedItem::Text(text) if !text.is_empty() => {
437                let part_id = format!("queued.p{}", parts.len());
438                content.push(text.clone());
439                parts.push(crate::Part {
440                    id: part_id,
441                    kind: crate::PartKind::Text,
442                    content: text,
443                    attachment: None,
444                    tool_call_id: None,
445                    tool_name: None,
446                    tool_replay: None,
447                    prune_state: crate::PruneState::Intact,
448                    reasoning_meta: None,
449                    response_meta: None,
450                });
451            }
452            super::NormalizedItem::Text(_) => {}
453            super::NormalizedItem::Image(reference) => {
454                let part_id = format!("queued.p{}", parts.len());
455                parts.push(crate::Part {
456                    id: part_id,
457                    kind: crate::PartKind::Image,
458                    content: String::new(),
459                    attachment: Some(crate::session_model::message::PartAttachment { reference }),
460                    tool_call_id: None,
461                    tool_name: None,
462                    tool_replay: None,
463                    prune_state: crate::PruneState::Intact,
464                    reasoning_meta: None,
465                    response_meta: None,
466                });
467            }
468        }
469    }
470    if parts.is_empty() {
471        return Ok(None);
472    }
473    Ok(Some(PluginMessage {
474        role: crate::MessageRole::User,
475        content: content.join("\n"),
476        origin: None,
477        parts,
478        images: Vec::new(),
479    }))
480}