Skip to main content

lash_core/runtime/
turn_queue.rs

1use super::process::ProcessWakeDelivery;
2use crate::{PluginMessage, TurnCause, TurnInput};
3
4pub const QUEUED_WORK_CLAIM_TTL_MS: u64 = 30 * 1000;
5
6#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
7#[serde(tag = "kind", rename_all = "snake_case")]
8pub enum SessionCommand {
9    // No generation guard: the command drains asynchronously, so any
10    // generation observed at enqueue time may legitimately have advanced by
11    // drain time, and the refresh recomputes the surface from live sources
12    // regardless — a guard could only fail spuriously.
13    RefreshToolCatalog { reason: String },
14    ResetSession { reason: String },
15}
16
17impl SessionCommand {
18    pub fn kind(&self) -> &'static str {
19        match self {
20            Self::RefreshToolCatalog { .. } => "refresh_tool_catalog",
21            Self::ResetSession { .. } => "reset_session",
22        }
23    }
24
25    pub fn source_key(&self, idempotency_key: impl AsRef<str>) -> String {
26        format!("command:{}:{}", self.kind(), idempotency_key.as_ref())
27    }
28}
29
30#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
31pub struct SessionCommandReceipt {
32    pub session_id: String,
33    pub batch_id: String,
34    pub source_key: String,
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
38#[serde(rename_all = "snake_case")]
39pub enum DeliveryPolicy {
40    EarliestSafeBoundary,
41    AfterCurrentTurnCommit,
42}
43
44impl DeliveryPolicy {
45    pub fn as_str(self) -> &'static str {
46        match self {
47            Self::EarliestSafeBoundary => "earliest_safe_boundary",
48            Self::AfterCurrentTurnCommit => "after_current_turn_commit",
49        }
50    }
51
52    pub fn from_wire_str(value: &str) -> Option<Self> {
53        match value {
54            "earliest_safe_boundary" => Some(Self::EarliestSafeBoundary),
55            "after_current_turn_commit" => Some(Self::AfterCurrentTurnCommit),
56            _ => None,
57        }
58    }
59}
60
61#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum SlotPolicy {
64    Join,
65    Exclusive,
66}
67
68impl SlotPolicy {
69    pub fn as_str(self) -> &'static str {
70        match self {
71            Self::Join => "join",
72            Self::Exclusive => "exclusive",
73        }
74    }
75
76    pub fn from_wire_str(value: &str) -> Option<Self> {
77        match value {
78            "join" => Some(Self::Join),
79            "exclusive" => Some(Self::Exclusive),
80            _ => None,
81        }
82    }
83}
84
85#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum MergeKey {
88    Never,
89    PayloadDefault,
90    Group(String),
91}
92
93#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum QueuedWorkPayload {
96    TurnInput { input: Box<TurnInput> },
97    ProcessWake { wake: Box<ProcessWakeDelivery> },
98    SessionCommand { command: Box<SessionCommand> },
99}
100
101#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
102#[serde(rename_all = "snake_case")]
103pub enum QueuedWorkClass {
104    SessionCommand,
105    TurnWork,
106}
107
108impl QueuedWorkPayload {
109    pub fn turn_input(input: TurnInput) -> Self {
110        Self::TurnInput {
111            input: Box::new(input),
112        }
113    }
114
115    pub fn process_wake(wake: ProcessWakeDelivery) -> Self {
116        Self::ProcessWake {
117            wake: Box::new(wake),
118        }
119    }
120
121    pub fn session_command(command: SessionCommand) -> Self {
122        Self::SessionCommand {
123            command: Box::new(command),
124        }
125    }
126
127    pub fn work_class(&self) -> QueuedWorkClass {
128        match self {
129            Self::SessionCommand { .. } => QueuedWorkClass::SessionCommand,
130            Self::TurnInput { .. } | Self::ProcessWake { .. } => QueuedWorkClass::TurnWork,
131        }
132    }
133}
134
135#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
136pub struct QueuedWorkItem {
137    pub item_id: String,
138    pub payload: QueuedWorkPayload,
139}
140
141#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
142pub struct QueuedWorkBatch {
143    pub batch_id: String,
144    pub session_id: String,
145    pub enqueue_seq: u64,
146    #[serde(default, skip_serializing_if = "Option::is_none")]
147    pub source_key: Option<String>,
148    pub delivery_policy: DeliveryPolicy,
149    pub slot_policy: SlotPolicy,
150    pub merge_key: MergeKey,
151    pub available_at_ms: u64,
152    pub enqueued_at_ms: u64,
153    pub items: Vec<QueuedWorkItem>,
154}
155
156impl QueuedWorkBatch {
157    pub fn work_class(&self) -> Option<QueuedWorkClass> {
158        work_class_for_payloads(self.items.iter().map(|item| &item.payload))
159    }
160
161    pub fn is_session_command_work(&self) -> bool {
162        self.work_class() == Some(QueuedWorkClass::SessionCommand)
163    }
164
165    pub fn is_turn_work(&self) -> bool {
166        self.work_class() == Some(QueuedWorkClass::TurnWork)
167    }
168}
169
170#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
171pub struct QueuedWorkBatchDraft {
172    pub session_id: String,
173    #[serde(default, skip_serializing_if = "Option::is_none")]
174    pub source_key: Option<String>,
175    pub delivery_policy: DeliveryPolicy,
176    pub slot_policy: SlotPolicy,
177    pub merge_key: MergeKey,
178    pub available_at_ms: u64,
179    pub payloads: Vec<QueuedWorkPayload>,
180}
181
182impl QueuedWorkBatchDraft {
183    pub fn new(
184        session_id: impl Into<String>,
185        delivery_policy: DeliveryPolicy,
186        slot_policy: SlotPolicy,
187        payloads: impl Into<Vec<QueuedWorkPayload>>,
188    ) -> Self {
189        Self {
190            session_id: session_id.into(),
191            source_key: None,
192            delivery_policy,
193            slot_policy,
194            merge_key: MergeKey::Never,
195            available_at_ms: 0,
196            payloads: payloads.into(),
197        }
198    }
199
200    pub fn with_source_key(mut self, source_key: impl Into<String>) -> Self {
201        self.source_key = Some(source_key.into());
202        self
203    }
204
205    pub fn with_available_at_ms(mut self, available_at_ms: u64) -> Self {
206        self.available_at_ms = available_at_ms;
207        self
208    }
209
210    pub fn with_merge_key(mut self, merge_key: MergeKey) -> Self {
211        self.merge_key = merge_key;
212        self
213    }
214
215    pub fn work_class(&self) -> Option<QueuedWorkClass> {
216        work_class_for_payloads(self.payloads.iter())
217    }
218}
219
220fn work_class_for_payloads<'a>(
221    payloads: impl IntoIterator<Item = &'a QueuedWorkPayload>,
222) -> Option<QueuedWorkClass> {
223    let mut payloads = payloads.into_iter();
224    let first = payloads.next()?.work_class();
225    payloads
226        .all(|payload| payload.work_class() == first)
227        .then_some(first)
228}
229
230#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
231#[serde(rename_all = "snake_case")]
232pub enum QueuedWorkClaimBoundary {
233    ActiveTurnCheckpoint,
234    Idle,
235}
236
237#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
238pub struct QueuedWorkCompletion {
239    pub session_id: String,
240    pub claim_id: String,
241    pub lease_token: String,
242    pub batch_ids: Vec<String>,
243}
244
245#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
246pub struct QueuedWorkClaim {
247    pub session_id: String,
248    pub claim_id: String,
249    pub owner: crate::LeaseOwnerIdentity,
250    pub lease_token: String,
251    pub fencing_token: u64,
252    pub claimed_at_epoch_ms: u64,
253    pub expires_at_epoch_ms: u64,
254    pub batches: Vec<QueuedWorkBatch>,
255}
256
257impl QueuedWorkClaim {
258    pub fn completion(&self) -> QueuedWorkCompletion {
259        QueuedWorkCompletion {
260            session_id: self.session_id.clone(),
261            claim_id: self.claim_id.clone(),
262            lease_token: self.lease_token.clone(),
263            batch_ids: self
264                .batches
265                .iter()
266                .map(|batch| batch.batch_id.clone())
267                .collect(),
268        }
269    }
270
271    pub fn is_empty(&self) -> bool {
272        self.batches.iter().all(|batch| batch.items.is_empty())
273    }
274
275    pub fn materialize_for_checkpoint(&self) -> QueuedCheckpointWork {
276        let messages = Vec::new();
277        let mut transient_messages = Vec::new();
278        let mut turn_causes = Vec::new();
279        for batch in &self.batches {
280            for item in &batch.items {
281                match &item.payload {
282                    QueuedWorkPayload::TurnInput { input } => {
283                        if let Some(message) = plugin_message_from_turn_input(input) {
284                            transient_messages.push(message);
285                        }
286                    }
287                    QueuedWorkPayload::ProcessWake { wake } => {
288                        turn_causes.push(crate::process_wake_turn_cause(wake));
289                    }
290                    QueuedWorkPayload::SessionCommand { .. } => {}
291                }
292            }
293        }
294        QueuedCheckpointWork {
295            messages,
296            transient_messages,
297            turn_causes,
298        }
299    }
300
301    pub async fn materialize_for_checkpoint_with_attachments(
302        &self,
303        attachment_store: &dyn crate::AttachmentStore,
304    ) -> Result<QueuedCheckpointWork, String> {
305        let messages = Vec::new();
306        let mut transient_messages = Vec::new();
307        let mut turn_causes = Vec::new();
308        for batch in &self.batches {
309            for item in &batch.items {
310                match &item.payload {
311                    QueuedWorkPayload::TurnInput { input } => {
312                        if let Some(message) =
313                            plugin_message_from_turn_input_with_attachments(input, attachment_store)
314                                .await?
315                        {
316                            transient_messages.push(message);
317                        }
318                    }
319                    QueuedWorkPayload::ProcessWake { wake } => {
320                        turn_causes.push(crate::process_wake_turn_cause(wake));
321                    }
322                    QueuedWorkPayload::SessionCommand { .. } => {}
323                }
324            }
325        }
326        Ok(QueuedCheckpointWork {
327            messages,
328            transient_messages,
329            turn_causes,
330        })
331    }
332
333    pub fn accepted_turn_inputs(&self) -> Vec<crate::AcceptedInjectedTurnInput> {
334        let mut accepted = Vec::new();
335        for batch in &self.batches {
336            let id = batch.source_key.as_deref().map(|source| {
337                source
338                    .strip_prefix("host:")
339                    .or_else(|| source.strip_prefix("injection:"))
340                    .unwrap_or(source)
341                    .to_string()
342            });
343            for item in &batch.items {
344                if let QueuedWorkPayload::TurnInput { input } = &item.payload
345                    && let Some(message) = plugin_message_from_turn_input(input)
346                {
347                    accepted.push(crate::AcceptedInjectedTurnInput {
348                        id: id.clone(),
349                        message,
350                    });
351                }
352            }
353        }
354        accepted
355    }
356
357    pub fn exclusive_session_command(&self) -> Option<(&QueuedWorkBatch, &SessionCommand)> {
358        if self.batches.len() != 1 {
359            return None;
360        }
361        let batch = self.batches.first()?;
362        if batch.slot_policy != SlotPolicy::Exclusive || batch.items.len() != 1 {
363            return None;
364        }
365        let item = batch.items.first()?;
366        match &item.payload {
367            QueuedWorkPayload::SessionCommand { command } => Some((batch, command.as_ref())),
368            _ => None,
369        }
370    }
371
372    pub fn materialize_for_turn(&self) -> QueuedTurnWork {
373        let checkpoint = self.materialize_for_checkpoint();
374        let mut input_items = Vec::new();
375        let mut image_blobs = std::collections::HashMap::new();
376        let mut protocol_turn_options = None;
377        let mut trace_turn_id = None;
378        for batch in &self.batches {
379            for item in &batch.items {
380                if let QueuedWorkPayload::TurnInput { input } = &item.payload {
381                    input_items.extend(input.items.clone());
382                    image_blobs.extend(input.image_blobs.clone());
383                    if protocol_turn_options.is_none() {
384                        protocol_turn_options = input.protocol_turn_options.clone();
385                    }
386                    if trace_turn_id.is_none() {
387                        trace_turn_id = input.trace_turn_id.clone();
388                    }
389                }
390            }
391        }
392        QueuedTurnWork {
393            input: TurnInput {
394                items: input_items,
395                image_blobs,
396                protocol_turn_options,
397                trace_turn_id,
398                protocol_extension: None,
399                turn_context: crate::TurnContext::default(),
400            },
401            messages: checkpoint.messages,
402            turn_causes: checkpoint.turn_causes,
403        }
404    }
405}
406
407#[derive(Clone, Debug, Default)]
408pub struct QueuedCheckpointWork {
409    pub messages: Vec<PluginMessage>,
410    pub transient_messages: Vec<PluginMessage>,
411    pub turn_causes: Vec<TurnCause>,
412}
413
414#[derive(Clone, Debug)]
415pub struct QueuedTurnWork {
416    pub input: TurnInput,
417    pub messages: Vec<PluginMessage>,
418    pub turn_causes: Vec<TurnCause>,
419}
420
421pub fn process_wake_batch_draft(wake: ProcessWakeDelivery) -> QueuedWorkBatchDraft {
422    let source_key = format!("process:{}:event:{}:wake", wake.process_id, wake.sequence);
423    QueuedWorkBatchDraft::new(
424        wake.target_session_id.clone(),
425        DeliveryPolicy::EarliestSafeBoundary,
426        SlotPolicy::Exclusive,
427        vec![QueuedWorkPayload::process_wake(wake)],
428    )
429    .with_source_key(source_key)
430}
431
432fn plugin_message_from_turn_input(input: &TurnInput) -> Option<PluginMessage> {
433    let mut text = Vec::new();
434    let mut images = Vec::new();
435    for item in &input.items {
436        match item {
437            crate::InputItem::Text { text: item_text } if !item_text.is_empty() => {
438                text.push(item_text.clone());
439            }
440            crate::InputItem::Text { .. } => {}
441            crate::InputItem::ImageRef { id } => {
442                if let Some(bytes) = input.image_blobs.get(id).cloned() {
443                    images.push(bytes);
444                }
445            }
446        }
447    }
448    if text.is_empty() && images.is_empty() {
449        return None;
450    }
451    Some(PluginMessage {
452        role: crate::MessageRole::User,
453        content: text.join("\n"),
454        origin: None,
455        parts: Vec::new(),
456        images,
457    })
458}
459
460async fn plugin_message_from_turn_input_with_attachments(
461    input: &TurnInput,
462    attachment_store: &dyn crate::AttachmentStore,
463) -> Result<Option<PluginMessage>, String> {
464    let normalized =
465        super::io::normalize_input_items(&input.items, &input.image_blobs, attachment_store)
466            .await?;
467    let has_image = normalized
468        .iter()
469        .any(|item| matches!(item, super::NormalizedItem::Image(_)));
470    if !has_image {
471        return Ok(plugin_message_from_turn_input(input));
472    }
473
474    let mut content = Vec::new();
475    let mut parts = Vec::new();
476    for item in normalized {
477        match item {
478            super::NormalizedItem::Text(text) if !text.is_empty() => {
479                let part_id = format!("queued.p{}", parts.len());
480                content.push(text.clone());
481                parts.push(crate::Part {
482                    id: part_id,
483                    kind: crate::PartKind::Text,
484                    content: text,
485                    attachment: None,
486                    tool_call_id: None,
487                    tool_name: None,
488                    tool_replay: None,
489                    prune_state: crate::PruneState::Intact,
490                    reasoning_meta: None,
491                    response_meta: None,
492                });
493            }
494            super::NormalizedItem::Text(_) => {}
495            super::NormalizedItem::Image(reference) => {
496                let part_id = format!("queued.p{}", parts.len());
497                parts.push(crate::Part {
498                    id: part_id,
499                    kind: crate::PartKind::Image,
500                    content: String::new(),
501                    attachment: Some(crate::session_model::message::PartAttachment { reference }),
502                    tool_call_id: None,
503                    tool_name: None,
504                    tool_replay: None,
505                    prune_state: crate::PruneState::Intact,
506                    reasoning_meta: None,
507                    response_meta: None,
508                });
509            }
510        }
511    }
512    if parts.is_empty() {
513        return Ok(None);
514    }
515    Ok(Some(PluginMessage {
516        role: crate::MessageRole::User,
517        content: content.join("\n"),
518        origin: None,
519        parts,
520        images: Vec::new(),
521    }))
522}