1use super::process::ProcessWakeDelivery;
2use crate::{PluginMessage, TurnCause, TurnInput};
3
4pub const QUEUED_WORK_CLAIM_TTL_MS: u64 = 15 * 60 * 1000;
5
6#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
7#[serde(tag = "kind", rename_all = "snake_case")]
8pub enum SessionCommand {
9 RefreshToolSurface { reason: String },
14 ResetSession { reason: String },
15}
16
17impl SessionCommand {
18 pub fn kind(&self) -> &'static str {
19 match self {
20 Self::RefreshToolSurface { .. } => "refresh_tool_surface",
21 Self::ResetSession { .. } => "reset_session",
22 }
23 }
24
25 pub fn source_key(&self, idempotency_key: impl AsRef<str>) -> String {
26 format!("command:{}:{}", self.kind(), idempotency_key.as_ref())
27 }
28}
29
30#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
31pub struct SessionCommandReceipt {
32 pub session_id: String,
33 pub batch_id: String,
34 pub source_key: String,
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
38#[serde(rename_all = "snake_case")]
39pub enum DeliveryPolicy {
40 EarliestSafeBoundary,
41 AfterCurrentTurnCommit,
42}
43
44impl DeliveryPolicy {
45 pub fn as_str(self) -> &'static str {
46 match self {
47 Self::EarliestSafeBoundary => "earliest_safe_boundary",
48 Self::AfterCurrentTurnCommit => "after_current_turn_commit",
49 }
50 }
51
52 pub fn from_wire_str(value: &str) -> Option<Self> {
53 match value {
54 "earliest_safe_boundary" => Some(Self::EarliestSafeBoundary),
55 "after_current_turn_commit" => Some(Self::AfterCurrentTurnCommit),
56 _ => None,
57 }
58 }
59}
60
61#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum SlotPolicy {
64 Join,
65 Exclusive,
66}
67
68impl SlotPolicy {
69 pub fn as_str(self) -> &'static str {
70 match self {
71 Self::Join => "join",
72 Self::Exclusive => "exclusive",
73 }
74 }
75
76 pub fn from_wire_str(value: &str) -> Option<Self> {
77 match value {
78 "join" => Some(Self::Join),
79 "exclusive" => Some(Self::Exclusive),
80 _ => None,
81 }
82 }
83}
84
85#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum MergeKey {
88 Never,
89 PayloadDefault,
90 Group(String),
91}
92
93#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum QueuedWorkPayload {
96 TurnInput { input: Box<TurnInput> },
97 ProcessWake { wake: Box<ProcessWakeDelivery> },
98 SessionCommand { command: Box<SessionCommand> },
99}
100
101impl QueuedWorkPayload {
102 pub fn turn_input(input: TurnInput) -> Self {
103 Self::TurnInput {
104 input: Box::new(input),
105 }
106 }
107
108 pub fn process_wake(wake: ProcessWakeDelivery) -> Self {
109 Self::ProcessWake {
110 wake: Box::new(wake),
111 }
112 }
113
114 pub fn session_command(command: SessionCommand) -> Self {
115 Self::SessionCommand {
116 command: Box::new(command),
117 }
118 }
119}
120
121#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
122pub struct QueuedWorkItem {
123 pub item_id: String,
124 pub payload: QueuedWorkPayload,
125}
126
127#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
128pub struct QueuedWorkBatch {
129 pub batch_id: String,
130 pub session_id: String,
131 pub enqueue_seq: u64,
132 #[serde(default, skip_serializing_if = "Option::is_none")]
133 pub source_key: Option<String>,
134 pub delivery_policy: DeliveryPolicy,
135 pub slot_policy: SlotPolicy,
136 pub merge_key: MergeKey,
137 pub available_at_ms: u64,
138 pub enqueued_at_ms: u64,
139 pub items: Vec<QueuedWorkItem>,
140}
141
142#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
143pub struct QueuedWorkBatchDraft {
144 pub session_id: String,
145 #[serde(default, skip_serializing_if = "Option::is_none")]
146 pub source_key: Option<String>,
147 pub delivery_policy: DeliveryPolicy,
148 pub slot_policy: SlotPolicy,
149 pub merge_key: MergeKey,
150 pub available_at_ms: u64,
151 pub payloads: Vec<QueuedWorkPayload>,
152}
153
154impl QueuedWorkBatchDraft {
155 pub fn new(
156 session_id: impl Into<String>,
157 delivery_policy: DeliveryPolicy,
158 slot_policy: SlotPolicy,
159 payloads: impl Into<Vec<QueuedWorkPayload>>,
160 ) -> Self {
161 Self {
162 session_id: session_id.into(),
163 source_key: None,
164 delivery_policy,
165 slot_policy,
166 merge_key: MergeKey::Never,
167 available_at_ms: 0,
168 payloads: payloads.into(),
169 }
170 }
171
172 pub fn with_source_key(mut self, source_key: impl Into<String>) -> Self {
173 self.source_key = Some(source_key.into());
174 self
175 }
176
177 pub fn with_available_at_ms(mut self, available_at_ms: u64) -> Self {
178 self.available_at_ms = available_at_ms;
179 self
180 }
181
182 pub fn with_merge_key(mut self, merge_key: MergeKey) -> Self {
183 self.merge_key = merge_key;
184 self
185 }
186}
187
188#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
189#[serde(rename_all = "snake_case")]
190pub enum QueuedWorkClaimBoundary {
191 ActiveTurnCheckpoint,
192 Idle,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
196pub struct QueuedWorkCompletion {
197 pub session_id: String,
198 pub claim_id: String,
199 pub lease_token: String,
200 pub batch_ids: Vec<String>,
201}
202
203#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
204pub struct QueuedWorkClaim {
205 pub session_id: String,
206 pub claim_id: String,
207 pub owner_id: String,
208 pub lease_token: String,
209 pub fencing_token: u64,
210 pub claimed_at_epoch_ms: u64,
211 pub expires_at_epoch_ms: u64,
212 pub batches: Vec<QueuedWorkBatch>,
213}
214
215impl QueuedWorkClaim {
216 pub fn completion(&self) -> QueuedWorkCompletion {
217 QueuedWorkCompletion {
218 session_id: self.session_id.clone(),
219 claim_id: self.claim_id.clone(),
220 lease_token: self.lease_token.clone(),
221 batch_ids: self
222 .batches
223 .iter()
224 .map(|batch| batch.batch_id.clone())
225 .collect(),
226 }
227 }
228
229 pub fn is_empty(&self) -> bool {
230 self.batches.iter().all(|batch| batch.items.is_empty())
231 }
232
233 pub fn materialize_for_checkpoint(&self) -> QueuedCheckpointWork {
234 let messages = Vec::new();
235 let mut transient_messages = Vec::new();
236 let mut turn_causes = Vec::new();
237 for batch in &self.batches {
238 for item in &batch.items {
239 match &item.payload {
240 QueuedWorkPayload::TurnInput { input } => {
241 if let Some(message) = plugin_message_from_turn_input(input) {
242 transient_messages.push(message);
243 }
244 }
245 QueuedWorkPayload::ProcessWake { wake } => {
246 turn_causes.push(crate::process_wake_turn_cause(wake));
247 }
248 QueuedWorkPayload::SessionCommand { .. } => {}
249 }
250 }
251 }
252 QueuedCheckpointWork {
253 messages,
254 transient_messages,
255 turn_causes,
256 }
257 }
258
259 pub async fn materialize_for_checkpoint_with_attachments(
260 &self,
261 attachment_store: &dyn crate::AttachmentStore,
262 ) -> Result<QueuedCheckpointWork, String> {
263 let messages = Vec::new();
264 let mut transient_messages = Vec::new();
265 let mut turn_causes = Vec::new();
266 for batch in &self.batches {
267 for item in &batch.items {
268 match &item.payload {
269 QueuedWorkPayload::TurnInput { input } => {
270 if let Some(message) =
271 plugin_message_from_turn_input_with_attachments(input, attachment_store)
272 .await?
273 {
274 transient_messages.push(message);
275 }
276 }
277 QueuedWorkPayload::ProcessWake { wake } => {
278 turn_causes.push(crate::process_wake_turn_cause(wake));
279 }
280 QueuedWorkPayload::SessionCommand { .. } => {}
281 }
282 }
283 }
284 Ok(QueuedCheckpointWork {
285 messages,
286 transient_messages,
287 turn_causes,
288 })
289 }
290
291 pub fn accepted_turn_inputs(&self) -> Vec<crate::AcceptedInjectedTurnInput> {
292 let mut accepted = Vec::new();
293 for batch in &self.batches {
294 let id = batch.source_key.as_deref().map(|source| {
295 source
296 .strip_prefix("host:")
297 .or_else(|| source.strip_prefix("injection:"))
298 .unwrap_or(source)
299 .to_string()
300 });
301 for item in &batch.items {
302 if let QueuedWorkPayload::TurnInput { input } = &item.payload
303 && let Some(message) = plugin_message_from_turn_input(input)
304 {
305 accepted.push(crate::AcceptedInjectedTurnInput {
306 id: id.clone(),
307 message,
308 });
309 }
310 }
311 }
312 accepted
313 }
314
315 pub fn exclusive_session_command(&self) -> Option<(&QueuedWorkBatch, &SessionCommand)> {
316 if self.batches.len() != 1 {
317 return None;
318 }
319 let batch = self.batches.first()?;
320 if batch.slot_policy != SlotPolicy::Exclusive || batch.items.len() != 1 {
321 return None;
322 }
323 let item = batch.items.first()?;
324 match &item.payload {
325 QueuedWorkPayload::SessionCommand { command } => Some((batch, command.as_ref())),
326 _ => None,
327 }
328 }
329
330 pub fn materialize_for_turn(&self) -> QueuedTurnWork {
331 let checkpoint = self.materialize_for_checkpoint();
332 let mut input_items = Vec::new();
333 let mut image_blobs = std::collections::HashMap::new();
334 let mut protocol_turn_options = None;
335 let mut trace_turn_id = None;
336 for batch in &self.batches {
337 for item in &batch.items {
338 if let QueuedWorkPayload::TurnInput { input } = &item.payload {
339 input_items.extend(input.items.clone());
340 image_blobs.extend(input.image_blobs.clone());
341 if protocol_turn_options.is_none() {
342 protocol_turn_options = input.protocol_turn_options.clone();
343 }
344 if trace_turn_id.is_none() {
345 trace_turn_id = input.trace_turn_id.clone();
346 }
347 }
348 }
349 }
350 QueuedTurnWork {
351 input: TurnInput {
352 items: input_items,
353 image_blobs,
354 protocol_turn_options,
355 trace_turn_id,
356 protocol_extension: None,
357 turn_context: crate::TurnContext::default(),
358 },
359 messages: checkpoint.messages,
360 turn_causes: checkpoint.turn_causes,
361 }
362 }
363}
364
365#[derive(Clone, Debug, Default)]
366pub struct QueuedCheckpointWork {
367 pub messages: Vec<PluginMessage>,
368 pub transient_messages: Vec<PluginMessage>,
369 pub turn_causes: Vec<TurnCause>,
370}
371
372#[derive(Clone, Debug)]
373pub struct QueuedTurnWork {
374 pub input: TurnInput,
375 pub messages: Vec<PluginMessage>,
376 pub turn_causes: Vec<TurnCause>,
377}
378
379pub fn process_wake_batch_draft(wake: ProcessWakeDelivery) -> QueuedWorkBatchDraft {
380 let source_key = format!("process:{}:event:{}:wake", wake.process_id, wake.sequence);
381 QueuedWorkBatchDraft::new(
382 wake.target_session_id.clone(),
383 DeliveryPolicy::EarliestSafeBoundary,
384 SlotPolicy::Exclusive,
385 vec![QueuedWorkPayload::process_wake(wake)],
386 )
387 .with_source_key(source_key)
388}
389
390fn plugin_message_from_turn_input(input: &TurnInput) -> Option<PluginMessage> {
391 let mut text = Vec::new();
392 let mut images = Vec::new();
393 for item in &input.items {
394 match item {
395 crate::InputItem::Text { text: item_text } if !item_text.is_empty() => {
396 text.push(item_text.clone());
397 }
398 crate::InputItem::Text { .. } => {}
399 crate::InputItem::ImageRef { id } => {
400 if let Some(bytes) = input.image_blobs.get(id).cloned() {
401 images.push(bytes);
402 }
403 }
404 }
405 }
406 if text.is_empty() && images.is_empty() {
407 return None;
408 }
409 Some(PluginMessage {
410 role: crate::MessageRole::User,
411 content: text.join("\n"),
412 origin: None,
413 parts: Vec::new(),
414 images,
415 })
416}
417
418async fn plugin_message_from_turn_input_with_attachments(
419 input: &TurnInput,
420 attachment_store: &dyn crate::AttachmentStore,
421) -> Result<Option<PluginMessage>, String> {
422 let normalized =
423 super::io::normalize_input_items(&input.items, &input.image_blobs, attachment_store)
424 .await?;
425 let has_image = normalized
426 .iter()
427 .any(|item| matches!(item, super::NormalizedItem::Image(_)));
428 if !has_image {
429 return Ok(plugin_message_from_turn_input(input));
430 }
431
432 let mut content = Vec::new();
433 let mut parts = Vec::new();
434 for item in normalized {
435 match item {
436 super::NormalizedItem::Text(text) if !text.is_empty() => {
437 let part_id = format!("queued.p{}", parts.len());
438 content.push(text.clone());
439 parts.push(crate::Part {
440 id: part_id,
441 kind: crate::PartKind::Text,
442 content: text,
443 attachment: None,
444 tool_call_id: None,
445 tool_name: None,
446 tool_replay: None,
447 prune_state: crate::PruneState::Intact,
448 reasoning_meta: None,
449 response_meta: None,
450 });
451 }
452 super::NormalizedItem::Text(_) => {}
453 super::NormalizedItem::Image(reference) => {
454 let part_id = format!("queued.p{}", parts.len());
455 parts.push(crate::Part {
456 id: part_id,
457 kind: crate::PartKind::Image,
458 content: String::new(),
459 attachment: Some(crate::session_model::message::PartAttachment { reference }),
460 tool_call_id: None,
461 tool_name: None,
462 tool_replay: None,
463 prune_state: crate::PruneState::Intact,
464 reasoning_meta: None,
465 response_meta: None,
466 });
467 }
468 }
469 }
470 if parts.is_empty() {
471 return Ok(None);
472 }
473 Ok(Some(PluginMessage {
474 role: crate::MessageRole::User,
475 content: content.join("\n"),
476 origin: None,
477 parts,
478 images: Vec::new(),
479 }))
480}