1use super::process::ProcessWakeDelivery;
2use crate::{PluginMessage, TurnCause, TurnInput};
3
4pub const QUEUED_WORK_CLAIM_TTL_MS: u64 = 15 * 60 * 1000;
5
6#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
7#[serde(tag = "kind", rename_all = "snake_case")]
8pub enum SessionCommand {
9 RefreshToolSurface {
10 reason: String,
11 #[serde(default, skip_serializing_if = "Option::is_none")]
12 expected_generation: Option<u64>,
13 },
14 ResetSession {
15 reason: String,
16 },
17}
18
19impl SessionCommand {
20 pub fn kind(&self) -> &'static str {
21 match self {
22 Self::RefreshToolSurface { .. } => "refresh_tool_surface",
23 Self::ResetSession { .. } => "reset_session",
24 }
25 }
26
27 pub fn source_key(&self, idempotency_key: impl AsRef<str>) -> String {
28 format!("command:{}:{}", self.kind(), idempotency_key.as_ref())
29 }
30}
31
32#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
33pub struct SessionCommandReceipt {
34 pub session_id: String,
35 pub batch_id: String,
36 pub source_key: String,
37}
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum DeliveryPolicy {
42 EarliestSafeBoundary,
43 AfterCurrentTurnCommit,
44}
45
46impl DeliveryPolicy {
47 pub fn as_str(self) -> &'static str {
48 match self {
49 Self::EarliestSafeBoundary => "earliest_safe_boundary",
50 Self::AfterCurrentTurnCommit => "after_current_turn_commit",
51 }
52 }
53
54 pub fn from_wire_str(value: &str) -> Option<Self> {
55 match value {
56 "earliest_safe_boundary" => Some(Self::EarliestSafeBoundary),
57 "after_current_turn_commit" => Some(Self::AfterCurrentTurnCommit),
58 _ => None,
59 }
60 }
61}
62
63#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub enum SlotPolicy {
66 Join,
67 Exclusive,
68}
69
70impl SlotPolicy {
71 pub fn as_str(self) -> &'static str {
72 match self {
73 Self::Join => "join",
74 Self::Exclusive => "exclusive",
75 }
76 }
77
78 pub fn from_wire_str(value: &str) -> Option<Self> {
79 match value {
80 "join" => Some(Self::Join),
81 "exclusive" => Some(Self::Exclusive),
82 _ => None,
83 }
84 }
85}
86
87#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub enum MergeKey {
90 Never,
91 PayloadDefault,
92 Group(String),
93}
94
95#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
96#[serde(tag = "type", rename_all = "snake_case")]
97pub enum QueuedWorkPayload {
98 TurnInput { input: Box<TurnInput> },
99 ProcessWake { wake: Box<ProcessWakeDelivery> },
100 SessionCommand { command: Box<SessionCommand> },
101}
102
103impl QueuedWorkPayload {
104 pub fn turn_input(input: TurnInput) -> Self {
105 Self::TurnInput {
106 input: Box::new(input),
107 }
108 }
109
110 pub fn process_wake(wake: ProcessWakeDelivery) -> Self {
111 Self::ProcessWake {
112 wake: Box::new(wake),
113 }
114 }
115
116 pub fn session_command(command: SessionCommand) -> Self {
117 Self::SessionCommand {
118 command: Box::new(command),
119 }
120 }
121}
122
123#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
124pub struct QueuedWorkItem {
125 pub item_id: String,
126 pub payload: QueuedWorkPayload,
127}
128
129#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
130pub struct QueuedWorkBatch {
131 pub batch_id: String,
132 pub session_id: String,
133 pub enqueue_seq: u64,
134 #[serde(default, skip_serializing_if = "Option::is_none")]
135 pub source_key: Option<String>,
136 pub delivery_policy: DeliveryPolicy,
137 pub slot_policy: SlotPolicy,
138 pub merge_key: MergeKey,
139 pub available_at_ms: u64,
140 pub enqueued_at_ms: u64,
141 pub items: Vec<QueuedWorkItem>,
142}
143
144#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
145pub struct QueuedWorkBatchDraft {
146 pub session_id: String,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
148 pub source_key: Option<String>,
149 pub delivery_policy: DeliveryPolicy,
150 pub slot_policy: SlotPolicy,
151 pub merge_key: MergeKey,
152 pub available_at_ms: u64,
153 pub payloads: Vec<QueuedWorkPayload>,
154}
155
156impl QueuedWorkBatchDraft {
157 pub fn new(
158 session_id: impl Into<String>,
159 delivery_policy: DeliveryPolicy,
160 slot_policy: SlotPolicy,
161 payloads: impl Into<Vec<QueuedWorkPayload>>,
162 ) -> Self {
163 Self {
164 session_id: session_id.into(),
165 source_key: None,
166 delivery_policy,
167 slot_policy,
168 merge_key: MergeKey::Never,
169 available_at_ms: 0,
170 payloads: payloads.into(),
171 }
172 }
173
174 pub fn with_source_key(mut self, source_key: impl Into<String>) -> Self {
175 self.source_key = Some(source_key.into());
176 self
177 }
178
179 pub fn with_available_at_ms(mut self, available_at_ms: u64) -> Self {
180 self.available_at_ms = available_at_ms;
181 self
182 }
183
184 pub fn with_merge_key(mut self, merge_key: MergeKey) -> Self {
185 self.merge_key = merge_key;
186 self
187 }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
191#[serde(rename_all = "snake_case")]
192pub enum QueuedWorkClaimBoundary {
193 ActiveTurnCheckpoint,
194 Idle,
195}
196
197#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
198pub struct QueuedWorkCompletion {
199 pub session_id: String,
200 pub claim_id: String,
201 pub lease_token: String,
202 pub batch_ids: Vec<String>,
203}
204
205#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
206pub struct QueuedWorkClaim {
207 pub session_id: String,
208 pub claim_id: String,
209 pub owner_id: String,
210 pub lease_token: String,
211 pub fencing_token: u64,
212 pub claimed_at_epoch_ms: u64,
213 pub expires_at_epoch_ms: u64,
214 pub batches: Vec<QueuedWorkBatch>,
215}
216
217impl QueuedWorkClaim {
218 pub fn completion(&self) -> QueuedWorkCompletion {
219 QueuedWorkCompletion {
220 session_id: self.session_id.clone(),
221 claim_id: self.claim_id.clone(),
222 lease_token: self.lease_token.clone(),
223 batch_ids: self
224 .batches
225 .iter()
226 .map(|batch| batch.batch_id.clone())
227 .collect(),
228 }
229 }
230
231 pub fn is_empty(&self) -> bool {
232 self.batches.iter().all(|batch| batch.items.is_empty())
233 }
234
235 pub fn materialize_for_checkpoint(&self) -> QueuedCheckpointWork {
236 let messages = Vec::new();
237 let mut transient_messages = Vec::new();
238 let mut turn_causes = Vec::new();
239 for batch in &self.batches {
240 for item in &batch.items {
241 match &item.payload {
242 QueuedWorkPayload::TurnInput { input } => {
243 if let Some(message) = plugin_message_from_turn_input(input) {
244 transient_messages.push(message);
245 }
246 }
247 QueuedWorkPayload::ProcessWake { wake } => {
248 turn_causes.push(crate::process_wake_turn_cause(wake));
249 }
250 QueuedWorkPayload::SessionCommand { .. } => {}
251 }
252 }
253 }
254 QueuedCheckpointWork {
255 messages,
256 transient_messages,
257 turn_causes,
258 }
259 }
260
261 pub fn materialize_for_checkpoint_with_attachments(
262 &self,
263 attachment_store: &dyn crate::AttachmentStore,
264 ) -> Result<QueuedCheckpointWork, String> {
265 let messages = Vec::new();
266 let mut transient_messages = Vec::new();
267 let mut turn_causes = Vec::new();
268 for batch in &self.batches {
269 for item in &batch.items {
270 match &item.payload {
271 QueuedWorkPayload::TurnInput { input } => {
272 if let Some(message) = plugin_message_from_turn_input_with_attachments(
273 input,
274 attachment_store,
275 )? {
276 transient_messages.push(message);
277 }
278 }
279 QueuedWorkPayload::ProcessWake { wake } => {
280 turn_causes.push(crate::process_wake_turn_cause(wake));
281 }
282 QueuedWorkPayload::SessionCommand { .. } => {}
283 }
284 }
285 }
286 Ok(QueuedCheckpointWork {
287 messages,
288 transient_messages,
289 turn_causes,
290 })
291 }
292
293 pub fn accepted_turn_inputs(&self) -> Vec<crate::AcceptedInjectedTurnInput> {
294 let mut accepted = Vec::new();
295 for batch in &self.batches {
296 let id = batch.source_key.as_deref().map(|source| {
297 source
298 .strip_prefix("host:")
299 .or_else(|| source.strip_prefix("injection:"))
300 .unwrap_or(source)
301 .to_string()
302 });
303 for item in &batch.items {
304 if let QueuedWorkPayload::TurnInput { input } = &item.payload
305 && let Some(message) = plugin_message_from_turn_input(input)
306 {
307 accepted.push(crate::AcceptedInjectedTurnInput {
308 id: id.clone(),
309 message,
310 });
311 }
312 }
313 }
314 accepted
315 }
316
317 pub fn exclusive_session_command(&self) -> Option<(&QueuedWorkBatch, &SessionCommand)> {
318 if self.batches.len() != 1 {
319 return None;
320 }
321 let batch = self.batches.first()?;
322 if batch.slot_policy != SlotPolicy::Exclusive || batch.items.len() != 1 {
323 return None;
324 }
325 let item = batch.items.first()?;
326 match &item.payload {
327 QueuedWorkPayload::SessionCommand { command } => Some((batch, command.as_ref())),
328 _ => None,
329 }
330 }
331
332 pub fn materialize_for_turn(&self) -> QueuedTurnWork {
333 let checkpoint = self.materialize_for_checkpoint();
334 let mut input_items = Vec::new();
335 let mut image_blobs = std::collections::HashMap::new();
336 let mut protocol_turn_options = None;
337 let mut trace_turn_id = None;
338 for batch in &self.batches {
339 for item in &batch.items {
340 if let QueuedWorkPayload::TurnInput { input } = &item.payload {
341 input_items.extend(input.items.clone());
342 image_blobs.extend(input.image_blobs.clone());
343 if protocol_turn_options.is_none() {
344 protocol_turn_options = input.protocol_turn_options.clone();
345 }
346 if trace_turn_id.is_none() {
347 trace_turn_id = input.trace_turn_id.clone();
348 }
349 }
350 }
351 }
352 QueuedTurnWork {
353 input: TurnInput {
354 items: input_items,
355 image_blobs,
356 protocol_turn_options,
357 trace_turn_id,
358 protocol_extension: None,
359 turn_context: crate::TurnContext::default(),
360 },
361 messages: checkpoint.messages,
362 turn_causes: checkpoint.turn_causes,
363 }
364 }
365}
366
367#[derive(Clone, Debug, Default)]
368pub struct QueuedCheckpointWork {
369 pub messages: Vec<PluginMessage>,
370 pub transient_messages: Vec<PluginMessage>,
371 pub turn_causes: Vec<TurnCause>,
372}
373
374#[derive(Clone, Debug)]
375pub struct QueuedTurnWork {
376 pub input: TurnInput,
377 pub messages: Vec<PluginMessage>,
378 pub turn_causes: Vec<TurnCause>,
379}
380
381pub fn process_wake_batch_draft(wake: ProcessWakeDelivery) -> QueuedWorkBatchDraft {
382 let source_key = format!("process:{}:event:{}:wake", wake.process_id, wake.sequence);
383 QueuedWorkBatchDraft::new(
384 wake.target_session_id.clone(),
385 DeliveryPolicy::EarliestSafeBoundary,
386 SlotPolicy::Exclusive,
387 vec![QueuedWorkPayload::process_wake(wake)],
388 )
389 .with_source_key(source_key)
390}
391
392fn plugin_message_from_turn_input(input: &TurnInput) -> Option<PluginMessage> {
393 let mut text = Vec::new();
394 let mut images = Vec::new();
395 for item in &input.items {
396 match item {
397 crate::InputItem::Text { text: item_text } if !item_text.is_empty() => {
398 text.push(item_text.clone());
399 }
400 crate::InputItem::Text { .. } => {}
401 crate::InputItem::ImageRef { id } => {
402 if let Some(bytes) = input.image_blobs.get(id).cloned() {
403 images.push(bytes);
404 }
405 }
406 }
407 }
408 if text.is_empty() && images.is_empty() {
409 return None;
410 }
411 Some(PluginMessage {
412 role: crate::MessageRole::User,
413 content: text.join("\n"),
414 origin: None,
415 parts: Vec::new(),
416 images,
417 })
418}
419
420fn plugin_message_from_turn_input_with_attachments(
421 input: &TurnInput,
422 attachment_store: &dyn crate::AttachmentStore,
423) -> Result<Option<PluginMessage>, String> {
424 let normalized =
425 super::io::normalize_input_items(&input.items, &input.image_blobs, attachment_store)?;
426 let has_image = normalized
427 .iter()
428 .any(|item| matches!(item, super::NormalizedItem::Image(_)));
429 if !has_image {
430 return Ok(plugin_message_from_turn_input(input));
431 }
432
433 let mut content = Vec::new();
434 let mut parts = Vec::new();
435 for item in normalized {
436 match item {
437 super::NormalizedItem::Text(text) if !text.is_empty() => {
438 let part_id = format!("queued.p{}", parts.len());
439 content.push(text.clone());
440 parts.push(crate::Part {
441 id: part_id,
442 kind: crate::PartKind::Text,
443 content: text,
444 attachment: None,
445 tool_call_id: None,
446 tool_name: None,
447 tool_replay: None,
448 prune_state: crate::PruneState::Intact,
449 reasoning_meta: None,
450 response_meta: None,
451 });
452 }
453 super::NormalizedItem::Text(_) => {}
454 super::NormalizedItem::Image(reference) => {
455 let part_id = format!("queued.p{}", parts.len());
456 parts.push(crate::Part {
457 id: part_id,
458 kind: crate::PartKind::Image,
459 content: String::new(),
460 attachment: Some(crate::session_model::message::PartAttachment { reference }),
461 tool_call_id: None,
462 tool_name: None,
463 tool_replay: None,
464 prune_state: crate::PruneState::Intact,
465 reasoning_meta: None,
466 response_meta: None,
467 });
468 }
469 }
470 }
471 if parts.is_empty() {
472 return Ok(None);
473 }
474 Ok(Some(PluginMessage {
475 role: crate::MessageRole::User,
476 content: content.join("\n"),
477 origin: None,
478 parts,
479 images: Vec::new(),
480 }))
481}