Skip to main content

lash_core/runtime/
turn_queue.rs

1use super::process::ProcessWakeDelivery;
2use crate::{PluginMessage, TurnCause, TurnInput};
3
4pub const QUEUED_WORK_CLAIM_TTL_MS: u64 = 15 * 60 * 1000;
5
6#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
7#[serde(tag = "kind", rename_all = "snake_case")]
8pub enum SessionCommand {
9    RefreshToolSurface {
10        reason: String,
11        #[serde(default, skip_serializing_if = "Option::is_none")]
12        expected_generation: Option<u64>,
13    },
14    ResetSession {
15        reason: String,
16    },
17}
18
19impl SessionCommand {
20    pub fn kind(&self) -> &'static str {
21        match self {
22            Self::RefreshToolSurface { .. } => "refresh_tool_surface",
23            Self::ResetSession { .. } => "reset_session",
24        }
25    }
26
27    pub fn source_key(&self, idempotency_key: impl AsRef<str>) -> String {
28        format!("command:{}:{}", self.kind(), idempotency_key.as_ref())
29    }
30}
31
32#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
33pub struct SessionCommandReceipt {
34    pub session_id: String,
35    pub batch_id: String,
36    pub source_key: String,
37}
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum DeliveryPolicy {
42    EarliestSafeBoundary,
43    AfterCurrentTurnCommit,
44}
45
46impl DeliveryPolicy {
47    pub fn as_str(self) -> &'static str {
48        match self {
49            Self::EarliestSafeBoundary => "earliest_safe_boundary",
50            Self::AfterCurrentTurnCommit => "after_current_turn_commit",
51        }
52    }
53
54    pub fn from_wire_str(value: &str) -> Option<Self> {
55        match value {
56            "earliest_safe_boundary" => Some(Self::EarliestSafeBoundary),
57            "after_current_turn_commit" => Some(Self::AfterCurrentTurnCommit),
58            _ => None,
59        }
60    }
61}
62
63#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub enum SlotPolicy {
66    Join,
67    Exclusive,
68}
69
70impl SlotPolicy {
71    pub fn as_str(self) -> &'static str {
72        match self {
73            Self::Join => "join",
74            Self::Exclusive => "exclusive",
75        }
76    }
77
78    pub fn from_wire_str(value: &str) -> Option<Self> {
79        match value {
80            "join" => Some(Self::Join),
81            "exclusive" => Some(Self::Exclusive),
82            _ => None,
83        }
84    }
85}
86
87#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub enum MergeKey {
90    Never,
91    PayloadDefault,
92    Group(String),
93}
94
95#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
96#[serde(tag = "type", rename_all = "snake_case")]
97pub enum QueuedWorkPayload {
98    TurnInput { input: Box<TurnInput> },
99    ProcessWake { wake: Box<ProcessWakeDelivery> },
100    SessionCommand { command: Box<SessionCommand> },
101}
102
103impl QueuedWorkPayload {
104    pub fn turn_input(input: TurnInput) -> Self {
105        Self::TurnInput {
106            input: Box::new(input),
107        }
108    }
109
110    pub fn process_wake(wake: ProcessWakeDelivery) -> Self {
111        Self::ProcessWake {
112            wake: Box::new(wake),
113        }
114    }
115
116    pub fn session_command(command: SessionCommand) -> Self {
117        Self::SessionCommand {
118            command: Box::new(command),
119        }
120    }
121}
122
123#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
124pub struct QueuedWorkItem {
125    pub item_id: String,
126    pub payload: QueuedWorkPayload,
127}
128
129#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
130pub struct QueuedWorkBatch {
131    pub batch_id: String,
132    pub session_id: String,
133    pub enqueue_seq: u64,
134    #[serde(default, skip_serializing_if = "Option::is_none")]
135    pub source_key: Option<String>,
136    pub delivery_policy: DeliveryPolicy,
137    pub slot_policy: SlotPolicy,
138    pub merge_key: MergeKey,
139    pub available_at_ms: u64,
140    pub enqueued_at_ms: u64,
141    pub items: Vec<QueuedWorkItem>,
142}
143
144#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
145pub struct QueuedWorkBatchDraft {
146    pub session_id: String,
147    #[serde(default, skip_serializing_if = "Option::is_none")]
148    pub source_key: Option<String>,
149    pub delivery_policy: DeliveryPolicy,
150    pub slot_policy: SlotPolicy,
151    pub merge_key: MergeKey,
152    pub available_at_ms: u64,
153    pub payloads: Vec<QueuedWorkPayload>,
154}
155
156impl QueuedWorkBatchDraft {
157    pub fn new(
158        session_id: impl Into<String>,
159        delivery_policy: DeliveryPolicy,
160        slot_policy: SlotPolicy,
161        payloads: impl Into<Vec<QueuedWorkPayload>>,
162    ) -> Self {
163        Self {
164            session_id: session_id.into(),
165            source_key: None,
166            delivery_policy,
167            slot_policy,
168            merge_key: MergeKey::Never,
169            available_at_ms: 0,
170            payloads: payloads.into(),
171        }
172    }
173
174    pub fn with_source_key(mut self, source_key: impl Into<String>) -> Self {
175        self.source_key = Some(source_key.into());
176        self
177    }
178
179    pub fn with_available_at_ms(mut self, available_at_ms: u64) -> Self {
180        self.available_at_ms = available_at_ms;
181        self
182    }
183
184    pub fn with_merge_key(mut self, merge_key: MergeKey) -> Self {
185        self.merge_key = merge_key;
186        self
187    }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
191#[serde(rename_all = "snake_case")]
192pub enum QueuedWorkClaimBoundary {
193    ActiveTurnCheckpoint,
194    Idle,
195}
196
197#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
198pub struct QueuedWorkCompletion {
199    pub session_id: String,
200    pub claim_id: String,
201    pub lease_token: String,
202    pub batch_ids: Vec<String>,
203}
204
205#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
206pub struct QueuedWorkClaim {
207    pub session_id: String,
208    pub claim_id: String,
209    pub owner_id: String,
210    pub lease_token: String,
211    pub fencing_token: u64,
212    pub claimed_at_epoch_ms: u64,
213    pub expires_at_epoch_ms: u64,
214    pub batches: Vec<QueuedWorkBatch>,
215}
216
217impl QueuedWorkClaim {
218    pub fn completion(&self) -> QueuedWorkCompletion {
219        QueuedWorkCompletion {
220            session_id: self.session_id.clone(),
221            claim_id: self.claim_id.clone(),
222            lease_token: self.lease_token.clone(),
223            batch_ids: self
224                .batches
225                .iter()
226                .map(|batch| batch.batch_id.clone())
227                .collect(),
228        }
229    }
230
231    pub fn is_empty(&self) -> bool {
232        self.batches.iter().all(|batch| batch.items.is_empty())
233    }
234
235    pub fn materialize_for_checkpoint(&self) -> QueuedCheckpointWork {
236        let messages = Vec::new();
237        let mut transient_messages = Vec::new();
238        let mut turn_causes = Vec::new();
239        for batch in &self.batches {
240            for item in &batch.items {
241                match &item.payload {
242                    QueuedWorkPayload::TurnInput { input } => {
243                        if let Some(message) = plugin_message_from_turn_input(input) {
244                            transient_messages.push(message);
245                        }
246                    }
247                    QueuedWorkPayload::ProcessWake { wake } => {
248                        turn_causes.push(crate::process_wake_turn_cause(wake));
249                    }
250                    QueuedWorkPayload::SessionCommand { .. } => {}
251                }
252            }
253        }
254        QueuedCheckpointWork {
255            messages,
256            transient_messages,
257            turn_causes,
258        }
259    }
260
261    pub fn materialize_for_checkpoint_with_attachments(
262        &self,
263        attachment_store: &dyn crate::AttachmentStore,
264    ) -> Result<QueuedCheckpointWork, String> {
265        let messages = Vec::new();
266        let mut transient_messages = Vec::new();
267        let mut turn_causes = Vec::new();
268        for batch in &self.batches {
269            for item in &batch.items {
270                match &item.payload {
271                    QueuedWorkPayload::TurnInput { input } => {
272                        if let Some(message) = plugin_message_from_turn_input_with_attachments(
273                            input,
274                            attachment_store,
275                        )? {
276                            transient_messages.push(message);
277                        }
278                    }
279                    QueuedWorkPayload::ProcessWake { wake } => {
280                        turn_causes.push(crate::process_wake_turn_cause(wake));
281                    }
282                    QueuedWorkPayload::SessionCommand { .. } => {}
283                }
284            }
285        }
286        Ok(QueuedCheckpointWork {
287            messages,
288            transient_messages,
289            turn_causes,
290        })
291    }
292
293    pub fn accepted_turn_inputs(&self) -> Vec<crate::AcceptedInjectedTurnInput> {
294        let mut accepted = Vec::new();
295        for batch in &self.batches {
296            let id = batch.source_key.as_deref().map(|source| {
297                source
298                    .strip_prefix("host:")
299                    .or_else(|| source.strip_prefix("injection:"))
300                    .unwrap_or(source)
301                    .to_string()
302            });
303            for item in &batch.items {
304                if let QueuedWorkPayload::TurnInput { input } = &item.payload
305                    && let Some(message) = plugin_message_from_turn_input(input)
306                {
307                    accepted.push(crate::AcceptedInjectedTurnInput {
308                        id: id.clone(),
309                        message,
310                    });
311                }
312            }
313        }
314        accepted
315    }
316
317    pub fn exclusive_session_command(&self) -> Option<(&QueuedWorkBatch, &SessionCommand)> {
318        if self.batches.len() != 1 {
319            return None;
320        }
321        let batch = self.batches.first()?;
322        if batch.slot_policy != SlotPolicy::Exclusive || batch.items.len() != 1 {
323            return None;
324        }
325        let item = batch.items.first()?;
326        match &item.payload {
327            QueuedWorkPayload::SessionCommand { command } => Some((batch, command.as_ref())),
328            _ => None,
329        }
330    }
331
332    pub fn materialize_for_turn(&self) -> QueuedTurnWork {
333        let checkpoint = self.materialize_for_checkpoint();
334        let mut input_items = Vec::new();
335        let mut image_blobs = std::collections::HashMap::new();
336        let mut protocol_turn_options = None;
337        let mut trace_turn_id = None;
338        for batch in &self.batches {
339            for item in &batch.items {
340                if let QueuedWorkPayload::TurnInput { input } = &item.payload {
341                    input_items.extend(input.items.clone());
342                    image_blobs.extend(input.image_blobs.clone());
343                    if protocol_turn_options.is_none() {
344                        protocol_turn_options = input.protocol_turn_options.clone();
345                    }
346                    if trace_turn_id.is_none() {
347                        trace_turn_id = input.trace_turn_id.clone();
348                    }
349                }
350            }
351        }
352        QueuedTurnWork {
353            input: TurnInput {
354                items: input_items,
355                image_blobs,
356                protocol_turn_options,
357                trace_turn_id,
358                protocol_extension: None,
359                turn_context: crate::TurnContext::default(),
360            },
361            messages: checkpoint.messages,
362            turn_causes: checkpoint.turn_causes,
363        }
364    }
365}
366
367#[derive(Clone, Debug, Default)]
368pub struct QueuedCheckpointWork {
369    pub messages: Vec<PluginMessage>,
370    pub transient_messages: Vec<PluginMessage>,
371    pub turn_causes: Vec<TurnCause>,
372}
373
374#[derive(Clone, Debug)]
375pub struct QueuedTurnWork {
376    pub input: TurnInput,
377    pub messages: Vec<PluginMessage>,
378    pub turn_causes: Vec<TurnCause>,
379}
380
381pub fn process_wake_batch_draft(wake: ProcessWakeDelivery) -> QueuedWorkBatchDraft {
382    let source_key = format!("process:{}:event:{}:wake", wake.process_id, wake.sequence);
383    QueuedWorkBatchDraft::new(
384        wake.target_session_id.clone(),
385        DeliveryPolicy::EarliestSafeBoundary,
386        SlotPolicy::Exclusive,
387        vec![QueuedWorkPayload::process_wake(wake)],
388    )
389    .with_source_key(source_key)
390}
391
392fn plugin_message_from_turn_input(input: &TurnInput) -> Option<PluginMessage> {
393    let mut text = Vec::new();
394    let mut images = Vec::new();
395    for item in &input.items {
396        match item {
397            crate::InputItem::Text { text: item_text } if !item_text.is_empty() => {
398                text.push(item_text.clone());
399            }
400            crate::InputItem::Text { .. } => {}
401            crate::InputItem::ImageRef { id } => {
402                if let Some(bytes) = input.image_blobs.get(id).cloned() {
403                    images.push(bytes);
404                }
405            }
406        }
407    }
408    if text.is_empty() && images.is_empty() {
409        return None;
410    }
411    Some(PluginMessage {
412        role: crate::MessageRole::User,
413        content: text.join("\n"),
414        origin: None,
415        parts: Vec::new(),
416        images,
417    })
418}
419
420fn plugin_message_from_turn_input_with_attachments(
421    input: &TurnInput,
422    attachment_store: &dyn crate::AttachmentStore,
423) -> Result<Option<PluginMessage>, String> {
424    let normalized =
425        super::io::normalize_input_items(&input.items, &input.image_blobs, attachment_store)?;
426    let has_image = normalized
427        .iter()
428        .any(|item| matches!(item, super::NormalizedItem::Image(_)));
429    if !has_image {
430        return Ok(plugin_message_from_turn_input(input));
431    }
432
433    let mut content = Vec::new();
434    let mut parts = Vec::new();
435    for item in normalized {
436        match item {
437            super::NormalizedItem::Text(text) if !text.is_empty() => {
438                let part_id = format!("queued.p{}", parts.len());
439                content.push(text.clone());
440                parts.push(crate::Part {
441                    id: part_id,
442                    kind: crate::PartKind::Text,
443                    content: text,
444                    attachment: None,
445                    tool_call_id: None,
446                    tool_name: None,
447                    tool_replay: None,
448                    prune_state: crate::PruneState::Intact,
449                    reasoning_meta: None,
450                    response_meta: None,
451                });
452            }
453            super::NormalizedItem::Text(_) => {}
454            super::NormalizedItem::Image(reference) => {
455                let part_id = format!("queued.p{}", parts.len());
456                parts.push(crate::Part {
457                    id: part_id,
458                    kind: crate::PartKind::Image,
459                    content: String::new(),
460                    attachment: Some(crate::session_model::message::PartAttachment { reference }),
461                    tool_call_id: None,
462                    tool_name: None,
463                    tool_replay: None,
464                    prune_state: crate::PruneState::Intact,
465                    reasoning_meta: None,
466                    response_meta: None,
467                });
468            }
469        }
470    }
471    if parts.is_empty() {
472        return Ok(None);
473    }
474    Ok(Some(PluginMessage {
475        role: crate::MessageRole::User,
476        content: content.join("\n"),
477        origin: None,
478        parts,
479        images: Vec::new(),
480    }))
481}