1use crate::core::rpc_protocol::{
10 AssistantEvent, RpcAttachment, RpcCommand, RpcEvent, TurnUsage,
11};
12use crate::{AgentEvent, LlmEvent, SessionEvent, StreamEvent};
13
14pub const MAX_FRAME_BYTES: usize = 1024 * 1024;
18
19pub fn parse_frame(line: &str, max_bytes: usize) -> Result<RpcCommand, RpcEvent> {
26 if line.len() > max_bytes {
27 return Err(RpcEvent::Error {
28 id: None,
29 message: "frame exceeds 1 MiB limit".to_string(),
30 });
31 }
32 serde_json::from_str::<RpcCommand>(line).map_err(|e| RpcEvent::Error {
33 id: None,
34 message: e.to_string(),
35 })
36}
37
38pub fn map_stream_event(ev: &StreamEvent) -> Option<RpcEvent> {
52 match ev {
53 StreamEvent::Llm(LlmEvent::Thinking(s)) => Some(RpcEvent::MessageUpdate {
54 event: AssistantEvent::ThinkingDelta { delta: s.clone() },
55 }),
56 StreamEvent::Llm(LlmEvent::Text(s)) => Some(RpcEvent::MessageUpdate {
57 event: AssistantEvent::TextDelta { delta: s.clone() },
58 }),
59 StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
60 Some(RpcEvent::MessageUpdate {
61 event: AssistantEvent::ToolcallStart {
62 tool_id: tool_id.clone(),
63 tool_name: tool_name.clone(),
64 },
65 })
66 }
67 StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
68 Some(RpcEvent::MessageUpdate {
69 event: AssistantEvent::ToolcallInputDelta {
70 tool_id: tool_id.clone(),
71 delta: delta.clone(),
72 },
73 })
74 }
75 StreamEvent::Llm(LlmEvent::ToolUse { tool_id, input, .. }) => {
77 Some(RpcEvent::MessageUpdate {
78 event: AssistantEvent::ToolcallInput {
79 tool_id: tool_id.clone(),
80 input: input.clone(),
81 },
82 })
83 }
84 StreamEvent::Llm(LlmEvent::ToolResult { tool_id, result }) => {
85 Some(RpcEvent::MessageUpdate {
86 event: AssistantEvent::ToolcallResult {
87 tool_id: tool_id.clone(),
88 result: result.clone(),
89 },
90 })
91 }
92 StreamEvent::Llm(LlmEvent::ToolResultDelta { .. }) => None,
94
95 StreamEvent::Agent(AgentEvent::SubagentStart {
96 subagent_id,
97 agent_name,
98 task_preview,
99 }) => Some(RpcEvent::SubagentStart {
100 subagent_id: *subagent_id,
101 agent_name: agent_name.clone(),
102 task_preview: task_preview.clone(),
103 }),
104 StreamEvent::Agent(AgentEvent::SubagentUpdate {
105 subagent_id,
106 agent_name,
107 status,
108 }) => Some(RpcEvent::SubagentUpdate {
109 subagent_id: *subagent_id,
110 agent_name: agent_name.clone(),
111 status: status.clone(),
112 }),
113 StreamEvent::Agent(AgentEvent::SubagentDone {
114 subagent_id,
115 agent_name,
116 result_preview,
117 duration_secs,
118 }) => Some(RpcEvent::SubagentDone {
119 subagent_id: *subagent_id,
120 agent_name: agent_name.clone(),
121 result_preview: result_preview.clone(),
122 duration_secs: *duration_secs,
123 }),
124 StreamEvent::Agent(AgentEvent::SteeringDelivered { .. }) => None,
126
127 StreamEvent::Session(_) => None,
130 }
131}
132
133pub fn accumulate_usage(acc: &mut TurnUsage, event: &SessionEvent) {
141 if let SessionEvent::Usage {
142 input_tokens,
143 output_tokens,
144 cache_read_input_tokens,
145 cache_creation_input_tokens,
146 model,
147 } = event
148 {
149 acc.input_tokens += input_tokens;
150 acc.output_tokens += output_tokens;
151 acc.cache_read_input_tokens += cache_read_input_tokens;
152 acc.cache_creation_input_tokens += cache_creation_input_tokens;
153 if acc.model.is_none() {
154 acc.model = model.clone();
155 }
156 }
157}
158
159fn quote_path(p: &str) -> String {
166 let escaped = p.replace('\\', "\\\\").replace('"', "\\\"");
167 format!("\"{escaped}\"")
168}
169
170pub fn build_user_content(message: &str, attachments: &[RpcAttachment]) -> String {
171 if attachments.is_empty() {
172 return message.to_string();
173 }
174 let parts: Vec<String> = attachments.iter().map(|a| quote_path(&a.path)).collect();
175 format!("[user attached files: {}]\n{}", parts.join(", "), message)
176}
177
178pub fn build_tools_list_body(tools_schema: &[serde_json::Value]) -> serde_json::Value {
188 serde_json::json!({
189 "ok": true,
190 "tools": tools_schema,
191 })
192}
193
194#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::core::rpc_protocol::{AssistantEvent, RpcCommand, RpcEvent, RpcAttachment, TurnUsage};
200 use crate::{AgentEvent, LlmEvent, SessionEvent, StreamEvent};
201 use serde_json::json;
202
203 #[test]
206 fn parse_frame_valid_prompt() {
207 let line = r#"{"type":"prompt","id":"abc","message":"hello"}"#;
208 let result = parse_frame(line, MAX_FRAME_BYTES);
209 assert!(result.is_ok(), "should parse valid prompt frame");
210 match result.unwrap() {
211 RpcCommand::Prompt { id, message, attachments } => {
212 assert_eq!(id, "abc");
213 assert_eq!(message, "hello");
214 assert!(attachments.is_empty());
215 }
216 other => panic!("unexpected variant: {:?}", other),
217 }
218 }
219
220 #[test]
221 fn parse_frame_valid_shutdown() {
222 let line = r#"{"type":"shutdown"}"#;
223 let result = parse_frame(line, MAX_FRAME_BYTES);
224 assert!(result.is_ok());
225 assert!(matches!(result.unwrap(), RpcCommand::Shutdown));
226 }
227
228 #[test]
229 fn parse_frame_valid_follow_up() {
230 let line = r#"{"type":"follow_up","id":"f1","message":"and then?"}"#;
231 let result = parse_frame(line, MAX_FRAME_BYTES);
232 match result.unwrap() {
233 RpcCommand::FollowUp { id, message } => {
234 assert_eq!(id, "f1");
235 assert_eq!(message, "and then?");
236 }
237 other => panic!("unexpected: {:?}", other),
238 }
239 }
240
241 #[test]
242 fn parse_frame_valid_abort() {
243 let line = r#"{"type":"abort","id":"x"}"#;
244 assert!(matches!(parse_frame(line, MAX_FRAME_BYTES).unwrap(), RpcCommand::Abort { .. }));
245 }
246
247 #[test]
248 fn parse_frame_malformed_json() {
249 let line = "not json at all";
250 let result = parse_frame(line, MAX_FRAME_BYTES);
251 assert!(result.is_err());
252 match result.unwrap_err() {
253 RpcEvent::Error { id, message } => {
254 assert!(id.is_none(), "malformed-JSON error must have id=None");
255 assert!(!message.is_empty(), "error message must be non-empty");
256 }
257 other => panic!("unexpected event: {:?}", other),
258 }
259 }
260
261 #[test]
262 fn parse_frame_valid_json_unknown_type() {
263 let line = r#"{"type":"does_not_exist","id":"1"}"#;
265 let result = parse_frame(line, MAX_FRAME_BYTES);
266 assert!(result.is_err(), "unknown type should fail to deserialise");
267 }
268
269 #[test]
270 fn parse_frame_oversize() {
271 let oversize = "x".repeat(MAX_FRAME_BYTES + 1);
272 let result = parse_frame(&oversize, MAX_FRAME_BYTES);
273 assert!(result.is_err());
274 match result.unwrap_err() {
275 RpcEvent::Error { id, message } => {
276 assert!(id.is_none());
277 assert!(
278 message.contains("1 MiB"),
279 "expected '1 MiB' in message, got: {message}"
280 );
281 }
282 other => panic!("unexpected event: {:?}", other),
283 }
284 }
285
286 #[test]
287 fn parse_frame_exactly_at_limit_valid_json() {
288 let line = r#"{"type":"get_state","id":"x"}"#;
290 assert!(line.len() <= MAX_FRAME_BYTES);
291 let result = parse_frame(line, MAX_FRAME_BYTES);
292 assert!(result.is_ok());
293 }
294
295 #[test]
296 fn parse_frame_custom_small_limit() {
297 let line = r#"{"type":"shutdown"}"#; let result = parse_frame(line, 5); assert!(result.is_err());
301 match result.unwrap_err() {
302 RpcEvent::Error { id, .. } => assert!(id.is_none()),
303 other => panic!("unexpected: {:?}", other),
304 }
305 }
306
307 #[test]
310 fn map_llm_thinking() {
311 let ev = StreamEvent::Llm(LlmEvent::Thinking("hmm".to_string()));
312 let rpc = map_stream_event(&ev).expect("Thinking must produce an event");
313 match rpc {
314 RpcEvent::MessageUpdate {
315 event: AssistantEvent::ThinkingDelta { delta },
316 } => assert_eq!(delta, "hmm"),
317 other => panic!("unexpected: {:?}", other),
318 }
319 }
320
321 #[test]
322 fn map_llm_text() {
323 let ev = StreamEvent::Llm(LlmEvent::Text("hi".to_string()));
324 let rpc = map_stream_event(&ev).expect("Text must produce an event");
325 match rpc {
326 RpcEvent::MessageUpdate {
327 event: AssistantEvent::TextDelta { delta },
328 } => assert_eq!(delta, "hi"),
329 other => panic!("unexpected: {:?}", other),
330 }
331 }
332
333 #[test]
334 fn map_llm_tool_use_start() {
335 let ev = StreamEvent::Llm(LlmEvent::ToolUseStart {
336 tool_name: "bash".to_string(),
337 tool_id: "tid1".to_string(),
338 });
339 let rpc = map_stream_event(&ev).expect("ToolUseStart must produce an event");
340 match rpc {
341 RpcEvent::MessageUpdate {
342 event: AssistantEvent::ToolcallStart { tool_id, tool_name },
343 } => {
344 assert_eq!(tool_id, "tid1");
345 assert_eq!(tool_name, "bash");
346 }
347 other => panic!("unexpected: {:?}", other),
348 }
349 }
350
351 #[test]
352 fn map_llm_tool_use_delta() {
353 let ev = StreamEvent::Llm(LlmEvent::ToolUseDelta {
354 tool_id: "tid1".to_string(),
355 delta: r#"{"cmd":"#.to_string(),
356 });
357 let rpc = map_stream_event(&ev).expect("ToolUseDelta must produce an event");
358 match rpc {
359 RpcEvent::MessageUpdate {
360 event: AssistantEvent::ToolcallInputDelta { tool_id, delta },
361 } => {
362 assert_eq!(tool_id, "tid1");
363 assert_eq!(delta, r#"{"cmd":"#);
364 }
365 other => panic!("unexpected: {:?}", other),
366 }
367 }
368
369 #[test]
370 fn map_llm_tool_use_final_drops_tool_name() {
371 let ev = StreamEvent::Llm(LlmEvent::ToolUse {
372 tool_name: "bash".to_string(), tool_id: "tid1".to_string(),
374 input: json!({"cmd": "ls"}),
375 });
376 let rpc = map_stream_event(&ev).expect("ToolUse must produce an event");
377 match rpc {
378 RpcEvent::MessageUpdate {
379 event: AssistantEvent::ToolcallInput { tool_id, input },
380 } => {
381 assert_eq!(tool_id, "tid1");
382 assert_eq!(input, json!({"cmd": "ls"}));
383 }
385 other => panic!("unexpected: {:?}", other),
386 }
387 }
388
389 #[test]
390 fn map_llm_tool_result() {
391 let ev = StreamEvent::Llm(LlmEvent::ToolResult {
392 tool_id: "tid1".to_string(),
393 result: "output here".to_string(),
394 });
395 let rpc = map_stream_event(&ev).expect("ToolResult must produce an event");
396 match rpc {
397 RpcEvent::MessageUpdate {
398 event: AssistantEvent::ToolcallResult { tool_id, result },
399 } => {
400 assert_eq!(tool_id, "tid1");
401 assert_eq!(result, "output here");
402 }
403 other => panic!("unexpected: {:?}", other),
404 }
405 }
406
407 #[test]
408 fn map_llm_tool_result_delta_is_dropped() {
409 let ev = StreamEvent::Llm(LlmEvent::ToolResultDelta {
410 tool_id: "tid1".to_string(),
411 delta: "partial".to_string(),
412 });
413 assert!(
414 map_stream_event(&ev).is_none(),
415 "ToolResultDelta must be dropped — wire format has no streaming-result variant"
416 );
417 }
418
419 #[test]
420 fn map_agent_subagent_start() {
421 let ev = StreamEvent::Agent(AgentEvent::SubagentStart {
422 subagent_id: 7,
423 agent_name: "worker".to_string(),
424 task_preview: "do thing".to_string(),
425 });
426 let rpc = map_stream_event(&ev).expect("SubagentStart must produce an event");
427 match rpc {
428 RpcEvent::SubagentStart { subagent_id, agent_name, task_preview } => {
429 assert_eq!(subagent_id, 7);
430 assert_eq!(agent_name, "worker");
431 assert_eq!(task_preview, "do thing");
432 }
433 other => panic!("unexpected: {:?}", other),
434 }
435 }
436
437 #[test]
438 fn map_agent_subagent_update() {
439 let ev = StreamEvent::Agent(AgentEvent::SubagentUpdate {
440 subagent_id: 7,
441 agent_name: "worker".to_string(),
442 status: "running".to_string(),
443 });
444 let rpc = map_stream_event(&ev).expect("SubagentUpdate must produce an event");
445 match rpc {
446 RpcEvent::SubagentUpdate { subagent_id, agent_name, status } => {
447 assert_eq!(subagent_id, 7);
448 assert_eq!(agent_name, "worker");
449 assert_eq!(status, "running");
450 }
451 other => panic!("unexpected: {:?}", other),
452 }
453 }
454
455 #[test]
456 fn map_agent_subagent_done() {
457 let ev = StreamEvent::Agent(AgentEvent::SubagentDone {
458 subagent_id: 7,
459 agent_name: "worker".to_string(),
460 result_preview: "done!".to_string(),
461 duration_secs: 1.5,
462 });
463 let rpc = map_stream_event(&ev).expect("SubagentDone must produce an event");
464 match rpc {
465 RpcEvent::SubagentDone {
466 subagent_id,
467 agent_name,
468 result_preview,
469 duration_secs,
470 } => {
471 assert_eq!(subagent_id, 7);
472 assert_eq!(agent_name, "worker");
473 assert_eq!(result_preview, "done!");
474 assert!((duration_secs - 1.5).abs() < f64::EPSILON);
475 }
476 other => panic!("unexpected: {:?}", other),
477 }
478 }
479
480 #[test]
481 fn map_agent_steering_delivered_is_dropped() {
482 let ev = StreamEvent::Agent(AgentEvent::SteeringDelivered {
483 message: "steer".to_string(),
484 });
485 assert!(
486 map_stream_event(&ev).is_none(),
487 "SteeringDelivered must be dropped — internal hook signal"
488 );
489 }
490
491 #[test]
492 fn map_session_events_all_return_none() {
493 let events: &[StreamEvent] = &[
496 StreamEvent::Session(SessionEvent::Done),
497 StreamEvent::Session(SessionEvent::Error("oops".to_string())),
498 StreamEvent::Session(SessionEvent::MessageHistory(vec![])),
499 StreamEvent::Session(SessionEvent::Usage {
500 input_tokens: 1,
501 output_tokens: 2,
502 cache_read_input_tokens: 0,
503 cache_creation_input_tokens: 0,
504 model: None,
505 }),
506 ];
507 for ev in events {
508 assert!(
509 map_stream_event(ev).is_none(),
510 "Session event {:?} should return None",
511 ev
512 );
513 }
514 }
515
516 fn zero_usage() -> TurnUsage {
519 TurnUsage {
520 input_tokens: 0,
521 output_tokens: 0,
522 cache_read_input_tokens: 0,
523 cache_creation_input_tokens: 0,
524 model: None,
525 }
526 }
527
528 #[test]
529 fn accumulate_usage_basic() {
530 let mut acc = zero_usage();
531 let ev = SessionEvent::Usage {
532 input_tokens: 100,
533 output_tokens: 50,
534 cache_read_input_tokens: 10,
535 cache_creation_input_tokens: 5,
536 model: Some("claude-3-5".to_string()),
537 };
538 accumulate_usage(&mut acc, &ev);
539 assert_eq!(acc.input_tokens, 100);
540 assert_eq!(acc.output_tokens, 50);
541 assert_eq!(acc.cache_read_input_tokens, 10);
542 assert_eq!(acc.cache_creation_input_tokens, 5);
543 assert_eq!(acc.model.as_deref(), Some("claude-3-5"));
544 }
545
546 #[test]
547 fn accumulate_usage_additive_across_calls() {
548 let mut acc = TurnUsage {
549 input_tokens: 10,
550 output_tokens: 5,
551 cache_read_input_tokens: 0,
552 cache_creation_input_tokens: 0,
553 model: Some("first-model".to_string()),
554 };
555 let ev = SessionEvent::Usage {
556 input_tokens: 20,
557 output_tokens: 8,
558 cache_read_input_tokens: 2,
559 cache_creation_input_tokens: 1,
560 model: Some("second-model".to_string()),
561 };
562 accumulate_usage(&mut acc, &ev);
563 assert_eq!(acc.input_tokens, 30);
564 assert_eq!(acc.output_tokens, 13);
565 assert_eq!(acc.cache_read_input_tokens, 2);
566 assert_eq!(acc.cache_creation_input_tokens, 1);
567 assert_eq!(acc.model.as_deref(), Some("first-model"));
569 }
570
571 #[test]
572 fn accumulate_usage_sets_model_when_none() {
573 let mut acc = zero_usage();
574 let ev = SessionEvent::Usage {
575 input_tokens: 1,
576 output_tokens: 1,
577 cache_read_input_tokens: 0,
578 cache_creation_input_tokens: 0,
579 model: Some("my-model".to_string()),
580 };
581 accumulate_usage(&mut acc, &ev);
582 assert_eq!(acc.model.as_deref(), Some("my-model"));
583 }
584
585 #[test]
586 fn accumulate_usage_ignores_done() {
587 let mut acc = zero_usage();
588 acc.input_tokens = 5;
589 accumulate_usage(&mut acc, &SessionEvent::Done);
590 assert_eq!(acc.input_tokens, 5, "Done must not mutate the accumulator");
591 }
592
593 #[test]
594 fn accumulate_usage_ignores_error() {
595 let mut acc = zero_usage();
596 acc.output_tokens = 3;
597 accumulate_usage(&mut acc, &SessionEvent::Error("boom".to_string()));
598 assert_eq!(acc.output_tokens, 3, "Error must not mutate the accumulator");
599 }
600
601 #[test]
602 fn accumulate_usage_ignores_message_history() {
603 let mut acc = zero_usage();
604 acc.input_tokens = 7;
605 accumulate_usage(&mut acc, &SessionEvent::MessageHistory(vec![]));
606 assert_eq!(acc.input_tokens, 7, "MessageHistory must not mutate the accumulator");
607 }
608
609 #[test]
612 fn build_user_content_no_attachments() {
613 assert_eq!(build_user_content("hello", &[]), "hello");
614 }
615
616 #[test]
617 fn build_user_content_single_attachment() {
618 let attachments = vec![RpcAttachment {
619 path: "/tmp/a.txt".to_string(),
620 name: None,
621 mime: None,
622 }];
623 let msg = build_user_content("check this", &attachments);
624 assert!(msg.starts_with("[user attached files: \"/tmp/a.txt\"]"));
625 assert!(msg.contains("check this"));
626 }
627
628 #[test]
629 fn build_user_content_multiple_attachments() {
630 let attachments = vec![
631 RpcAttachment { path: "/tmp/a.txt".to_string(), name: None, mime: None },
632 RpcAttachment { path: "/tmp/b.pdf".to_string(), name: None, mime: None },
633 ];
634 let msg = build_user_content("check these", &attachments);
635 assert!(
636 msg.contains("[user attached files: \"/tmp/a.txt\", \"/tmp/b.pdf\"]"),
637 "paths must be quoted and comma-separated: {msg}"
638 );
639 assert!(msg.contains("check these"));
640 }
641
642 #[test]
643 fn build_user_content_preserves_original_message() {
644 let attachments = vec![RpcAttachment {
645 path: "/tmp/x".to_string(),
646 name: Some("x".to_string()),
647 mime: Some("text/plain".to_string()),
648 }];
649 let original = "multi\nline\nmessage";
650 let msg = build_user_content(original, &attachments);
651 assert!(msg.ends_with(original), "original message must appear verbatim at the end");
652 }
653
654 #[test]
657 fn build_user_content_path_with_comma_is_quoted() {
658 let attachments = vec![RpcAttachment {
659 path: "/tmp/a,b.pdf".to_string(),
660 name: None,
661 mime: None,
662 }];
663 let msg = build_user_content("look", &attachments);
664 assert!(
665 msg.contains("\"/tmp/a,b.pdf\""),
666 "comma path must be wrapped in quotes: {msg}"
667 );
668 assert!(
670 !msg.contains("[user attached files: /tmp/a,b.pdf]"),
671 "bare unquoted comma path must not appear: {msg}"
672 );
673 }
674
675 #[test]
676 fn build_user_content_multiple_paths_each_quoted() {
677 let attachments = vec![
678 RpcAttachment { path: "/p1".to_string(), name: None, mime: None },
679 RpcAttachment { path: "/p2".to_string(), name: None, mime: None },
680 ];
681 let msg = build_user_content("x", &attachments);
682 assert!(
683 msg.contains("\"/p1\", \"/p2\""),
684 "each path must be individually quoted: {msg}"
685 );
686 }
687
688 #[test]
689 fn build_user_content_path_with_embedded_quote_is_escaped() {
690 let attachments = vec![RpcAttachment {
691 path: "/tmp/he\"llo".to_string(),
692 name: None,
693 mime: None,
694 }];
695 let msg = build_user_content("x", &attachments);
696 assert!(
697 msg.contains("\"/tmp/he\\\"llo\""),
698 "embedded double-quote must be backslash-escaped: {msg}"
699 );
700 }
701
702 #[test]
703 fn build_user_content_path_with_backslash_is_escaped() {
704 let attachments = vec![RpcAttachment {
705 path: "/tmp/a\\b".to_string(),
706 name: None,
707 mime: None,
708 }];
709 let msg = build_user_content("x", &attachments);
710 assert!(
711 msg.contains("\"/tmp/a\\\\b\""),
712 "backslash in path must be doubled: {msg}"
713 );
714 }
715
716 #[test]
719 fn build_tools_list_body_empty() {
720 let body = super::build_tools_list_body(&[]);
721 assert_eq!(body["ok"], true);
722 assert!(body["tools"].is_array());
723 assert_eq!(body["tools"].as_array().unwrap().len(), 0);
724 }
725
726 #[test]
727 fn build_tools_list_body_with_entries() {
728 let schema = vec![
729 json!({"name": "bash", "description": "Run bash", "input_schema": {"type": "object"}}),
730 json!({"name": "read", "description": "Read file", "input_schema": {"type": "object"}}),
731 ];
732 let body = super::build_tools_list_body(&schema);
733 assert_eq!(body["ok"], true);
734 let tools = body["tools"].as_array().unwrap();
735 assert_eq!(tools.len(), 2);
736 assert_eq!(tools[0]["name"], "bash");
737 assert_eq!(tools[1]["name"], "read");
738 }
739
740 #[test]
743 fn build_tools_list_body_roundtrip_satisfies_bridge_contract() {
744 let schema = vec![
745 json!({"name": "bash", "description": "desc", "input_schema": {}}),
746 ];
747 let body = super::build_tools_list_body(&schema);
748 let serialised = serde_json::to_string(&body).unwrap();
750 let parsed: serde_json::Value = serde_json::from_str(&serialised).unwrap();
751 assert_eq!(parsed["ok"], true, "bridge check: ok===true");
752 assert!(parsed["tools"].is_array(), "bridge check: Array.isArray(tools)");
753 }
754
755 #[tokio::test]
767 async fn handle_compact_releases_lock_before_slow_await() {
768 use std::sync::Arc;
769 use tokio::sync::Mutex;
770
771 let shared: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
772
773 let shared2 = shared.clone();
778 let task = tokio::spawn(async move {
779 let snapshot = {
781 let mut g = shared2.lock().await;
782 *g += 1; *g };
785 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
789
790 let mut g = shared2.lock().await;
792 *g = snapshot + 100;
793 });
794
795 tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
798 let acquired = tokio::time::timeout(
799 tokio::time::Duration::from_millis(5),
800 shared.lock(),
801 )
802 .await;
803 assert!(
804 acquired.is_ok(),
805 "second task must acquire the lock during the slow phase — \
806 handle_compact must NOT hold the lock across compact_conversation"
807 );
808 drop(acquired);
809
810 task.await.unwrap();
811 assert_eq!(*shared.lock().await, 101);
812 }
813}