1use super::process::ProcessWakeDelivery;
2use crate::{PluginMessage, TurnCause, TurnInput};
3
4pub const QUEUED_WORK_CLAIM_TTL_MS: u64 = 30 * 1000;
5
6#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
7#[serde(tag = "kind", rename_all = "snake_case")]
8pub enum SessionCommand {
9 RefreshToolCatalog { reason: String },
14 ResetSession { reason: String },
15}
16
17impl SessionCommand {
18 pub fn kind(&self) -> &'static str {
19 match self {
20 Self::RefreshToolCatalog { .. } => "refresh_tool_catalog",
21 Self::ResetSession { .. } => "reset_session",
22 }
23 }
24
25 pub fn source_key(&self, idempotency_key: impl AsRef<str>) -> String {
26 format!("command:{}:{}", self.kind(), idempotency_key.as_ref())
27 }
28}
29
30#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
31pub struct SessionCommandReceipt {
32 pub session_id: String,
33 pub batch_id: String,
34 pub source_key: String,
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
38#[serde(rename_all = "snake_case")]
39pub enum DeliveryPolicy {
40 EarliestSafeBoundary,
41 AfterCurrentTurnCommit,
42}
43
44impl DeliveryPolicy {
45 pub fn as_str(self) -> &'static str {
46 match self {
47 Self::EarliestSafeBoundary => "earliest_safe_boundary",
48 Self::AfterCurrentTurnCommit => "after_current_turn_commit",
49 }
50 }
51
52 pub fn from_wire_str(value: &str) -> Option<Self> {
53 match value {
54 "earliest_safe_boundary" => Some(Self::EarliestSafeBoundary),
55 "after_current_turn_commit" => Some(Self::AfterCurrentTurnCommit),
56 _ => None,
57 }
58 }
59}
60
61#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum SlotPolicy {
64 Join,
65 Exclusive,
66}
67
68impl SlotPolicy {
69 pub fn as_str(self) -> &'static str {
70 match self {
71 Self::Join => "join",
72 Self::Exclusive => "exclusive",
73 }
74 }
75
76 pub fn from_wire_str(value: &str) -> Option<Self> {
77 match value {
78 "join" => Some(Self::Join),
79 "exclusive" => Some(Self::Exclusive),
80 _ => None,
81 }
82 }
83}
84
85#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum MergeKey {
88 Never,
89 PayloadDefault,
90 Group(String),
91}
92
93#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum QueuedWorkPayload {
96 TurnInput { input: Box<TurnInput> },
97 ProcessWake { wake: Box<ProcessWakeDelivery> },
98 SessionCommand { command: Box<SessionCommand> },
99}
100
101#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
102#[serde(rename_all = "snake_case")]
103pub enum QueuedWorkClass {
104 SessionCommand,
105 TurnWork,
106}
107
108impl QueuedWorkPayload {
109 pub fn turn_input(input: TurnInput) -> Self {
110 Self::TurnInput {
111 input: Box::new(input),
112 }
113 }
114
115 pub fn process_wake(wake: ProcessWakeDelivery) -> Self {
116 Self::ProcessWake {
117 wake: Box::new(wake),
118 }
119 }
120
121 pub fn session_command(command: SessionCommand) -> Self {
122 Self::SessionCommand {
123 command: Box::new(command),
124 }
125 }
126
127 pub fn work_class(&self) -> QueuedWorkClass {
128 match self {
129 Self::SessionCommand { .. } => QueuedWorkClass::SessionCommand,
130 Self::TurnInput { .. } | Self::ProcessWake { .. } => QueuedWorkClass::TurnWork,
131 }
132 }
133}
134
135#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
136pub struct QueuedWorkItem {
137 pub item_id: String,
138 pub payload: QueuedWorkPayload,
139}
140
141#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
142pub struct QueuedWorkBatch {
143 pub batch_id: String,
144 pub session_id: String,
145 pub enqueue_seq: u64,
146 #[serde(default, skip_serializing_if = "Option::is_none")]
147 pub source_key: Option<String>,
148 pub delivery_policy: DeliveryPolicy,
149 pub slot_policy: SlotPolicy,
150 pub merge_key: MergeKey,
151 pub available_at_ms: u64,
152 pub enqueued_at_ms: u64,
153 pub items: Vec<QueuedWorkItem>,
154}
155
156impl QueuedWorkBatch {
157 pub fn work_class(&self) -> Option<QueuedWorkClass> {
158 work_class_for_payloads(self.items.iter().map(|item| &item.payload))
159 }
160
161 pub fn is_session_command_work(&self) -> bool {
162 self.work_class() == Some(QueuedWorkClass::SessionCommand)
163 }
164
165 pub fn is_turn_work(&self) -> bool {
166 self.work_class() == Some(QueuedWorkClass::TurnWork)
167 }
168}
169
170#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
171pub struct QueuedWorkBatchDraft {
172 pub session_id: String,
173 #[serde(default, skip_serializing_if = "Option::is_none")]
174 pub source_key: Option<String>,
175 pub delivery_policy: DeliveryPolicy,
176 pub slot_policy: SlotPolicy,
177 pub merge_key: MergeKey,
178 pub available_at_ms: u64,
179 pub payloads: Vec<QueuedWorkPayload>,
180}
181
182impl QueuedWorkBatchDraft {
183 pub fn new(
184 session_id: impl Into<String>,
185 delivery_policy: DeliveryPolicy,
186 slot_policy: SlotPolicy,
187 payloads: impl Into<Vec<QueuedWorkPayload>>,
188 ) -> Self {
189 Self {
190 session_id: session_id.into(),
191 source_key: None,
192 delivery_policy,
193 slot_policy,
194 merge_key: MergeKey::Never,
195 available_at_ms: 0,
196 payloads: payloads.into(),
197 }
198 }
199
200 pub fn with_source_key(mut self, source_key: impl Into<String>) -> Self {
201 self.source_key = Some(source_key.into());
202 self
203 }
204
205 pub fn with_available_at_ms(mut self, available_at_ms: u64) -> Self {
206 self.available_at_ms = available_at_ms;
207 self
208 }
209
210 pub fn with_merge_key(mut self, merge_key: MergeKey) -> Self {
211 self.merge_key = merge_key;
212 self
213 }
214
215 pub fn work_class(&self) -> Option<QueuedWorkClass> {
216 work_class_for_payloads(self.payloads.iter())
217 }
218}
219
220fn work_class_for_payloads<'a>(
221 payloads: impl IntoIterator<Item = &'a QueuedWorkPayload>,
222) -> Option<QueuedWorkClass> {
223 let mut payloads = payloads.into_iter();
224 let first = payloads.next()?.work_class();
225 payloads
226 .all(|payload| payload.work_class() == first)
227 .then_some(first)
228}
229
230#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
231#[serde(rename_all = "snake_case")]
232pub enum QueuedWorkClaimBoundary {
233 ActiveTurnCheckpoint,
234 Idle,
235}
236
237#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
238pub struct QueuedWorkCompletion {
239 pub session_id: String,
240 pub claim_id: String,
241 pub lease_token: String,
242 pub batch_ids: Vec<String>,
243}
244
245#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
246pub struct QueuedWorkClaim {
247 pub session_id: String,
248 pub claim_id: String,
249 pub owner: crate::LeaseOwnerIdentity,
250 pub lease_token: String,
251 pub fencing_token: u64,
252 pub claimed_at_epoch_ms: u64,
253 pub expires_at_epoch_ms: u64,
254 pub batches: Vec<QueuedWorkBatch>,
255}
256
257impl QueuedWorkClaim {
258 pub fn completion(&self) -> QueuedWorkCompletion {
259 QueuedWorkCompletion {
260 session_id: self.session_id.clone(),
261 claim_id: self.claim_id.clone(),
262 lease_token: self.lease_token.clone(),
263 batch_ids: self
264 .batches
265 .iter()
266 .map(|batch| batch.batch_id.clone())
267 .collect(),
268 }
269 }
270
271 pub fn is_empty(&self) -> bool {
272 self.batches.iter().all(|batch| batch.items.is_empty())
273 }
274
275 pub fn materialize_for_checkpoint(&self) -> QueuedCheckpointWork {
276 let messages = Vec::new();
277 let mut transient_messages = Vec::new();
278 let mut turn_causes = Vec::new();
279 for batch in &self.batches {
280 for item in &batch.items {
281 match &item.payload {
282 QueuedWorkPayload::TurnInput { input } => {
283 if let Some(message) = plugin_message_from_turn_input(input) {
284 transient_messages.push(message);
285 }
286 }
287 QueuedWorkPayload::ProcessWake { wake } => {
288 turn_causes.push(crate::process_wake_turn_cause(wake));
289 }
290 QueuedWorkPayload::SessionCommand { .. } => {}
291 }
292 }
293 }
294 QueuedCheckpointWork {
295 messages,
296 transient_messages,
297 turn_causes,
298 }
299 }
300
301 pub async fn materialize_for_checkpoint_with_attachments(
302 &self,
303 attachment_store: &dyn crate::AttachmentStore,
304 ) -> Result<QueuedCheckpointWork, String> {
305 let messages = Vec::new();
306 let mut transient_messages = Vec::new();
307 let mut turn_causes = Vec::new();
308 for batch in &self.batches {
309 for item in &batch.items {
310 match &item.payload {
311 QueuedWorkPayload::TurnInput { input } => {
312 if let Some(message) =
313 plugin_message_from_turn_input_with_attachments(input, attachment_store)
314 .await?
315 {
316 transient_messages.push(message);
317 }
318 }
319 QueuedWorkPayload::ProcessWake { wake } => {
320 turn_causes.push(crate::process_wake_turn_cause(wake));
321 }
322 QueuedWorkPayload::SessionCommand { .. } => {}
323 }
324 }
325 }
326 Ok(QueuedCheckpointWork {
327 messages,
328 transient_messages,
329 turn_causes,
330 })
331 }
332
333 pub fn accepted_turn_inputs(&self) -> Vec<crate::AcceptedInjectedTurnInput> {
334 let mut accepted = Vec::new();
335 for batch in &self.batches {
336 let id = batch.source_key.as_deref().map(|source| {
337 source
338 .strip_prefix("host:")
339 .or_else(|| source.strip_prefix("injection:"))
340 .unwrap_or(source)
341 .to_string()
342 });
343 for item in &batch.items {
344 if let QueuedWorkPayload::TurnInput { input } = &item.payload
345 && let Some(message) = plugin_message_from_turn_input(input)
346 {
347 accepted.push(crate::AcceptedInjectedTurnInput {
348 id: id.clone(),
349 message,
350 });
351 }
352 }
353 }
354 accepted
355 }
356
357 pub fn exclusive_session_command(&self) -> Option<(&QueuedWorkBatch, &SessionCommand)> {
358 if self.batches.len() != 1 {
359 return None;
360 }
361 let batch = self.batches.first()?;
362 if batch.slot_policy != SlotPolicy::Exclusive || batch.items.len() != 1 {
363 return None;
364 }
365 let item = batch.items.first()?;
366 match &item.payload {
367 QueuedWorkPayload::SessionCommand { command } => Some((batch, command.as_ref())),
368 _ => None,
369 }
370 }
371
372 pub fn materialize_for_turn(&self) -> QueuedTurnWork {
373 let checkpoint = self.materialize_for_checkpoint();
374 let mut input_items = Vec::new();
375 let mut image_blobs = std::collections::HashMap::new();
376 let mut protocol_turn_options = None;
377 let mut trace_turn_id = None;
378 for batch in &self.batches {
379 for item in &batch.items {
380 if let QueuedWorkPayload::TurnInput { input } = &item.payload {
381 input_items.extend(input.items.clone());
382 image_blobs.extend(input.image_blobs.clone());
383 if protocol_turn_options.is_none() {
384 protocol_turn_options = input.protocol_turn_options.clone();
385 }
386 if trace_turn_id.is_none() {
387 trace_turn_id = input.trace_turn_id.clone();
388 }
389 }
390 }
391 }
392 QueuedTurnWork {
393 input: TurnInput {
394 items: input_items,
395 image_blobs,
396 protocol_turn_options,
397 trace_turn_id,
398 protocol_extension: None,
399 turn_context: crate::TurnContext::default(),
400 },
401 messages: checkpoint.messages,
402 turn_causes: checkpoint.turn_causes,
403 }
404 }
405}
406
407#[derive(Clone, Debug, Default)]
408pub struct QueuedCheckpointWork {
409 pub messages: Vec<PluginMessage>,
410 pub transient_messages: Vec<PluginMessage>,
411 pub turn_causes: Vec<TurnCause>,
412}
413
414#[derive(Clone, Debug)]
415pub struct QueuedTurnWork {
416 pub input: TurnInput,
417 pub messages: Vec<PluginMessage>,
418 pub turn_causes: Vec<TurnCause>,
419}
420
421pub fn process_wake_batch_draft(wake: ProcessWakeDelivery) -> QueuedWorkBatchDraft {
422 let source_key = format!("process:{}:event:{}:wake", wake.process_id, wake.sequence);
423 QueuedWorkBatchDraft::new(
424 wake.target_session_id.clone(),
425 DeliveryPolicy::EarliestSafeBoundary,
426 SlotPolicy::Exclusive,
427 vec![QueuedWorkPayload::process_wake(wake)],
428 )
429 .with_source_key(source_key)
430}
431
432fn plugin_message_from_turn_input(input: &TurnInput) -> Option<PluginMessage> {
433 let mut text = Vec::new();
434 let mut images = Vec::new();
435 for item in &input.items {
436 match item {
437 crate::InputItem::Text { text: item_text } if !item_text.is_empty() => {
438 text.push(item_text.clone());
439 }
440 crate::InputItem::Text { .. } => {}
441 crate::InputItem::ImageRef { id } => {
442 if let Some(bytes) = input.image_blobs.get(id).cloned() {
443 images.push(bytes);
444 }
445 }
446 }
447 }
448 if text.is_empty() && images.is_empty() {
449 return None;
450 }
451 Some(PluginMessage {
452 role: crate::MessageRole::User,
453 content: text.join("\n"),
454 origin: None,
455 parts: Vec::new(),
456 images,
457 })
458}
459
460async fn plugin_message_from_turn_input_with_attachments(
461 input: &TurnInput,
462 attachment_store: &dyn crate::AttachmentStore,
463) -> Result<Option<PluginMessage>, String> {
464 let normalized =
465 super::io::normalize_input_items(&input.items, &input.image_blobs, attachment_store)
466 .await?;
467 let has_image = normalized
468 .iter()
469 .any(|item| matches!(item, super::NormalizedItem::Image(_)));
470 if !has_image {
471 return Ok(plugin_message_from_turn_input(input));
472 }
473
474 let mut content = Vec::new();
475 let mut parts = Vec::new();
476 for item in normalized {
477 match item {
478 super::NormalizedItem::Text(text) if !text.is_empty() => {
479 let part_id = format!("queued.p{}", parts.len());
480 content.push(text.clone());
481 parts.push(crate::Part {
482 id: part_id,
483 kind: crate::PartKind::Text,
484 content: text,
485 attachment: None,
486 tool_call_id: None,
487 tool_name: None,
488 tool_replay: None,
489 prune_state: crate::PruneState::Intact,
490 reasoning_meta: None,
491 response_meta: None,
492 });
493 }
494 super::NormalizedItem::Text(_) => {}
495 super::NormalizedItem::Image(reference) => {
496 let part_id = format!("queued.p{}", parts.len());
497 parts.push(crate::Part {
498 id: part_id,
499 kind: crate::PartKind::Image,
500 content: String::new(),
501 attachment: Some(crate::session_model::message::PartAttachment { reference }),
502 tool_call_id: None,
503 tool_name: None,
504 tool_replay: None,
505 prune_state: crate::PruneState::Intact,
506 reasoning_meta: None,
507 response_meta: None,
508 });
509 }
510 }
511 }
512 if parts.is_empty() {
513 return Ok(None);
514 }
515 Ok(Some(PluginMessage {
516 role: crate::MessageRole::User,
517 content: content.join("\n"),
518 origin: None,
519 parts,
520 images: Vec::new(),
521 }))
522}