1use std::sync::Arc;
6
7use kameo::prelude::*;
8
9use super::event::{ChangeEvent, CompressStrategy};
10use super::stream::AgentSendStream;
11use super::types::{ContextBackend, ScratchOpts};
12use crate::error::AgentError;
13use crate::message::ContextMessage;
14use crate::readonly::ReadOnly;
15use crate::role::Role;
16
17#[derive(Actor)]
38pub struct AgentContext<B: ContextBackend> {
39 backend: B,
40 immutable: ReadOnly<B::Message>,
41 compressed: Vec<B::Message>,
42 incremental: Vec<B::Message>,
43 #[expect(clippy::type_complexity, reason = "回调类型不可避免复杂")]
44 on_change: Option<Arc<dyn Fn(ChangeEvent<B::Message>) + Send + Sync>>,
45 #[expect(clippy::type_complexity, reason = "回调类型不可避免复杂")]
46 on_compressed: Option<
47 Arc<
48 dyn Fn(Vec<B::Message>, Vec<B::Message>) -> (Vec<B::Message>, Vec<B::Message>)
49 + Send
50 + Sync,
51 >,
52 >,
53}
54
55impl<B: ContextBackend> AgentContext<B> {
56 pub fn new(backend: B, immutable: Vec<B::Message>) -> Self {
61 Self {
62 backend,
63 immutable: ReadOnly::from(immutable),
64 compressed: Vec::new(),
65 incremental: Vec::new(),
66 on_change: None,
67 on_compressed: None,
68 }
69 }
70
71 pub fn with_on_change(
76 mut self,
77 f: impl Fn(ChangeEvent<B::Message>) + Send + Sync + 'static,
78 ) -> Self {
79 self.on_change = Some(Arc::new(f));
80 self
81 }
82
83 pub fn with_on_compressed(
89 mut self,
90 f: impl Fn(Vec<B::Message>, Vec<B::Message>) -> (Vec<B::Message>, Vec<B::Message>)
91 + Send
92 + Sync
93 + 'static,
94 ) -> Self {
95 self.on_compressed = Some(Arc::new(f));
96 self
97 }
98
99 fn default_summary_prompt() -> String {
100 "请将以下对话历史压缩为简洁摘要,保留关键信息、决策和上下文。输出一条 system 消息。"
101 .to_string()
102 }
103}
104
105pub struct AppendMsg<M> {
113 pub message: M,
115}
116
117impl<B: ContextBackend> Message<AppendMsg<B::Message>> for AgentContext<B> {
118 type Reply = ();
119
120 async fn handle(
121 &mut self,
122 msg: AppendMsg<B::Message>,
123 _ctx: &mut Context<Self, Self::Reply>,
124 ) -> Self::Reply {
125 self.incremental.push(msg.message);
126 if let Some(cb) = &self.on_change
127 && let Some(last) = self.incremental.last().cloned()
128 {
129 cb(ChangeEvent::Appended(last));
130 }
131 }
132}
133
134pub struct Len;
136
137impl<B: ContextBackend> Message<Len> for AgentContext<B> {
138 type Reply = usize;
139
140 async fn handle(&mut self, _msg: Len, _ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
141 self.immutable.len() + self.compressed.len() + self.incremental.len()
142 }
143}
144
145pub struct IsEmpty;
147
148impl<B: ContextBackend> Message<IsEmpty> for AgentContext<B> {
149 type Reply = bool;
150
151 async fn handle(
152 &mut self,
153 _msg: IsEmpty,
154 _ctx: &mut Context<Self, Self::Reply>,
155 ) -> Self::Reply {
156 self.immutable.is_empty() && self.compressed.is_empty() && self.incremental.is_empty()
157 }
158}
159
160pub struct ExtendMsg<M> {
164 pub messages: Vec<M>,
166}
167
168impl<B: ContextBackend> Message<ExtendMsg<B::Message>> for AgentContext<B> {
169 type Reply = ();
170
171 async fn handle(
172 &mut self,
173 msg: ExtendMsg<B::Message>,
174 _ctx: &mut Context<Self, Self::Reply>,
175 ) -> Self::Reply {
176 for m in msg.messages {
177 self.incremental.push(m);
178 if let Some(cb) = &self.on_change
179 && let Some(last) = self.incremental.last().cloned()
180 {
181 cb(ChangeEvent::Appended(last));
182 }
183 }
184 }
185}
186
187pub struct SilentAppendMsg<M> {
191 pub message: M,
193}
194
195impl<B: ContextBackend> Message<SilentAppendMsg<B::Message>> for AgentContext<B> {
196 type Reply = ();
197
198 async fn handle(
199 &mut self,
200 msg: SilentAppendMsg<B::Message>,
201 _ctx: &mut Context<Self, Self::Reply>,
202 ) -> Self::Reply {
203 self.incremental.push(msg.message);
204 }
205}
206
207pub struct Get(pub usize);
212
213impl<B: ContextBackend> Message<Get> for AgentContext<B> {
214 type Reply = Option<B::Message>;
215
216 async fn handle(&mut self, msg: Get, _ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
217 let idx = msg.0;
218 let imm_len = self.immutable.len();
219 let comp_len = self.compressed.len();
220 if idx < imm_len {
221 Some(self.immutable[idx].clone())
222 } else if idx < imm_len + comp_len {
223 Some(self.compressed[idx - imm_len].clone())
224 } else if idx < imm_len + comp_len + self.incremental.len() {
225 Some(self.incremental[idx - imm_len - comp_len].clone())
226 } else {
227 None
228 }
229 }
230}
231
232pub struct MessagesMsg;
236
237impl<B: ContextBackend> Message<MessagesMsg> for AgentContext<B> {
238 type Reply = Vec<B::Message>;
239
240 async fn handle(
241 &mut self,
242 _msg: MessagesMsg,
243 _ctx: &mut Context<Self, Self::Reply>,
244 ) -> Self::Reply {
245 self.immutable
246 .iter()
247 .chain(self.compressed.iter())
248 .chain(self.incremental.iter())
249 .cloned()
250 .collect()
251 }
252}
253
254pub struct ImmutableMsg;
256
257impl<B: ContextBackend> Message<ImmutableMsg> for AgentContext<B> {
258 type Reply = Vec<B::Message>;
259
260 async fn handle(
261 &mut self,
262 _msg: ImmutableMsg,
263 _ctx: &mut Context<Self, Self::Reply>,
264 ) -> Self::Reply {
265 self.immutable.to_vec()
266 }
267}
268
269pub struct CompressedMsg;
271
272impl<B: ContextBackend> Message<CompressedMsg> for AgentContext<B> {
273 type Reply = Vec<B::Message>;
274
275 async fn handle(
276 &mut self,
277 _msg: CompressedMsg,
278 _ctx: &mut Context<Self, Self::Reply>,
279 ) -> Self::Reply {
280 self.compressed.clone()
281 }
282}
283
284pub struct IncrementalMsg;
286
287impl<B: ContextBackend> Message<IncrementalMsg> for AgentContext<B> {
288 type Reply = Vec<B::Message>;
289
290 async fn handle(
291 &mut self,
292 _msg: IncrementalMsg,
293 _ctx: &mut Context<Self, Self::Reply>,
294 ) -> Self::Reply {
295 self.incremental.clone()
296 }
297}
298
299pub struct FindByRoleMsg(pub Role);
301
302impl<B: ContextBackend> Message<FindByRoleMsg> for AgentContext<B> {
303 type Reply = Vec<B::Message>;
304
305 async fn handle(
306 &mut self,
307 msg: FindByRoleMsg,
308 _ctx: &mut Context<Self, Self::Reply>,
309 ) -> Self::Reply {
310 self.immutable
311 .iter()
312 .chain(self.compressed.iter())
313 .chain(self.incremental.iter())
314 .filter(|m| m.role() == msg.0)
315 .cloned()
316 .collect()
317 }
318}
319
320pub struct UpdateMsg<M> {
324 pub index: usize,
326 pub message: M,
328}
329
330impl<B: ContextBackend> Message<UpdateMsg<B::Message>> for AgentContext<B> {
331 type Reply = Result<(), AgentError>;
332
333 async fn handle(
334 &mut self,
335 msg: UpdateMsg<B::Message>,
336 _ctx: &mut Context<Self, Self::Reply>,
337 ) -> Self::Reply {
338 if msg.index >= self.incremental.len() {
339 return Err(AgentError::Context("索引越界".into()));
340 }
341 let old = std::mem::replace(&mut self.incremental[msg.index], msg.message);
342 if let Some(cb) = &self.on_change {
343 cb(ChangeEvent::Updated {
344 index: msg.index,
345 old,
346 new: self.incremental[msg.index].clone(),
347 });
348 }
349 Ok(())
350 }
351}
352
353pub struct InsertMsg<M> {
357 pub index: usize,
359 pub message: M,
361}
362
363impl<B: ContextBackend> Message<InsertMsg<B::Message>> for AgentContext<B> {
364 type Reply = Result<(), AgentError>;
365
366 async fn handle(
367 &mut self,
368 msg: InsertMsg<B::Message>,
369 _ctx: &mut Context<Self, Self::Reply>,
370 ) -> Self::Reply {
371 if msg.index > self.incremental.len() {
372 return Err(AgentError::Context("索引越界".into()));
373 }
374 self.incremental.insert(msg.index, msg.message);
375 if let Some(cb) = &self.on_change {
376 cb(ChangeEvent::Inserted {
377 index: msg.index,
378 message: self.incremental[msg.index].clone(),
379 });
380 }
381 Ok(())
382 }
383}
384
385pub struct RemoveMsg {
389 pub index: usize,
391}
392
393impl<B: ContextBackend> Message<RemoveMsg> for AgentContext<B> {
394 type Reply = Result<(), AgentError>;
395
396 async fn handle(
397 &mut self,
398 msg: RemoveMsg,
399 _ctx: &mut Context<Self, Self::Reply>,
400 ) -> Self::Reply {
401 if msg.index >= self.incremental.len() {
402 return Err(AgentError::Context("索引越界".into()));
403 }
404 let removed = self.incremental.remove(msg.index);
405 if let Some(cb) = &self.on_change {
406 cb(ChangeEvent::Removed {
407 index: msg.index,
408 message: removed,
409 });
410 }
411 Ok(())
412 }
413}
414
415pub struct PopMsg;
419
420impl<B: ContextBackend> Message<PopMsg> for AgentContext<B> {
421 type Reply = Option<B::Message>;
422
423 async fn handle(&mut self, _msg: PopMsg, _ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
424 let popped = self.incremental.pop();
425 if let Some(ref msg) = popped
426 && let Some(cb) = &self.on_change
427 {
428 cb(ChangeEvent::Popped(msg.clone()));
429 }
430 popped
431 }
432}
433
434pub struct RetainMsg {
438 pub role: Role,
440}
441
442impl<B: ContextBackend> Message<RetainMsg> for AgentContext<B> {
443 type Reply = ();
444
445 async fn handle(
446 &mut self,
447 msg: RetainMsg,
448 _ctx: &mut Context<Self, Self::Reply>,
449 ) -> Self::Reply {
450 let mut removed = Vec::new();
451 let role = msg.role;
452 self.incremental.retain(|m| {
453 if m.role() == role {
454 true
455 } else {
456 removed.push(m.clone());
457 false
458 }
459 });
460 if let Some(cb) = &self.on_change {
461 cb(ChangeEvent::Retained { role, removed });
462 }
463 }
464}
465
466pub struct ClearMsg;
470
471impl<B: ContextBackend> Message<ClearMsg> for AgentContext<B> {
472 type Reply = ();
473
474 async fn handle(
475 &mut self,
476 _msg: ClearMsg,
477 _ctx: &mut Context<Self, Self::Reply>,
478 ) -> Self::Reply {
479 if !self.incremental.is_empty() {
480 let removed = std::mem::take(&mut self.incremental);
481 if let Some(cb) = &self.on_change {
482 cb(ChangeEvent::Cleared { removed });
483 }
484 }
485 }
486}
487
488pub struct CompressMsg<O> {
493 pub strategy: CompressStrategy,
495 pub opts: O,
497}
498
499impl<B: ContextBackend> Message<CompressMsg<B::Opts>> for AgentContext<B> {
500 type Reply = ();
501
502 async fn handle(
503 &mut self,
504 msg: CompressMsg<B::Opts>,
505 _ctx: &mut Context<Self, Self::Reply>,
506 ) -> Self::Reply {
507 match msg.strategy {
508 CompressStrategy::Summarize { keep, prompt } => {
509 let total = self.incremental.len();
510 if total > keep {
511 let split = total - keep;
512 let to_summarize: Vec<B::Message> = self.incremental.drain(..split).collect();
513 if !to_summarize.is_empty() {
514 let summary_prompt = prompt.unwrap_or_else(Self::default_summary_prompt);
515 let mut summary_messages =
516 vec![self.backend.system_message(summary_prompt)];
517 summary_messages.append(&mut self.compressed);
518 summary_messages.extend(to_summarize);
519 let result = self.backend.send(&summary_messages, &msg.opts).await;
520 if let Ok(response) = result {
521 if let Ok(raw_msgs) =
522 self.backend.extract_messages_from_backend_response(
523 std::slice::from_ref(&response),
524 )
525 {
526 if let Ok(request_msgs) = self.backend.to_request_messages(raw_msgs)
527 {
528 let summary: Vec<B::Message> = request_msgs
529 .into_iter()
530 .map(|msg| self.backend.to_system_message(msg))
531 .collect();
532 let kept: Vec<B::Message> =
533 self.incremental.drain(..).collect();
534 let (final_summary, final_kept) =
535 if let Some(cb) = &self.on_compressed {
536 cb(summary, kept)
537 } else {
538 (summary, kept)
539 };
540 self.compressed = final_summary;
541 self.incremental = final_kept;
542 } else {
543 log::warn!("压缩摘要转换请求格式失败,已跳过");
544 }
545 } else {
546 log::warn!("压缩摘要提取消息失败,已跳过");
547 }
548 }
549 }
550 }
551 }
552 }
553 }
554}
555
556pub struct SendMsg<O> {
562 pub opts: O,
564}
565
566impl<B: ContextBackend> Message<SendMsg<B::Opts>> for AgentContext<B> {
567 type Reply = Result<B::Response, AgentError>;
568
569 async fn handle(
570 &mut self,
571 msg: SendMsg<B::Opts>,
572 _ctx: &mut Context<Self, Self::Reply>,
573 ) -> Self::Reply {
574 let scratch = msg.opts.scratch().map(|s| s.to_string());
575 let mut all_messages: Vec<B::Message> = self
576 .immutable
577 .iter()
578 .chain(self.compressed.iter())
579 .chain(self.incremental.iter())
580 .cloned()
581 .collect();
582 if let Some(content) = scratch {
583 all_messages.push(self.backend.system_message(content));
584 }
585 let response = self.backend.send(&all_messages, &msg.opts).await?;
586 let raw_msgs = self
587 .backend
588 .extract_messages_from_backend_response(std::slice::from_ref(&response))?;
589 let request_msgs = self.backend.to_request_messages(raw_msgs)?;
590 for msg in &request_msgs {
591 self.incremental.push(msg.clone());
592 if let Some(cb) = &self.on_change {
593 cb(ChangeEvent::Appended(msg.clone()));
594 }
595 }
596 Ok(response)
597 }
598}
599
600pub struct SendStreamMsg<O> {
606 pub opts: O,
608}
609
610impl<B: ContextBackend + Clone> Message<SendStreamMsg<B::Opts>> for AgentContext<B> {
611 type Reply = AgentSendStream<B>;
612
613 async fn handle(
614 &mut self,
615 msg: SendStreamMsg<B::Opts>,
616 ctx: &mut Context<Self, Self::Reply>,
617 ) -> Self::Reply {
618 let scratch = msg.opts.scratch().map(|s| s.to_string());
619 let mut all_messages: Vec<B::Message> = self
620 .immutable
621 .iter()
622 .chain(self.compressed.iter())
623 .chain(self.incremental.iter())
624 .cloned()
625 .collect();
626 if let Some(content) = scratch {
627 all_messages.push(self.backend.system_message(content));
628 }
629 let stream = self.backend.send_stream(all_messages, msg.opts);
630 AgentSendStream::new(
631 self.backend.clone(),
632 stream,
633 ctx.actor_ref().clone(),
634 self.on_change.clone(),
635 )
636 }
637}
638
639pub struct EstimateTokensMsg;
644
645impl<B: ContextBackend> Message<EstimateTokensMsg> for AgentContext<B> {
646 type Reply = usize;
647
648 async fn handle(
649 &mut self,
650 _msg: EstimateTokensMsg,
651 _ctx: &mut Context<Self, Self::Reply>,
652 ) -> Self::Reply {
653 let all: Vec<B::Message> = self
654 .immutable
655 .iter()
656 .chain(self.compressed.iter())
657 .chain(self.incremental.iter())
658 .cloned()
659 .collect();
660 self.backend.estimate_tokens(&all).await.unwrap_or(0)
661 }
662}
663
664pub struct IsFullMsg;
668
669impl<B: ContextBackend> Message<IsFullMsg> for AgentContext<B> {
670 type Reply = bool;
671
672 async fn handle(
673 &mut self,
674 _msg: IsFullMsg,
675 _ctx: &mut Context<Self, Self::Reply>,
676 ) -> Self::Reply {
677 let all: Vec<B::Message> = self
678 .immutable
679 .iter()
680 .chain(self.compressed.iter())
681 .chain(self.incremental.iter())
682 .cloned()
683 .collect();
684 let tokens = self
685 .backend
686 .estimate_tokens(&all)
687 .await
688 .unwrap_or(usize::MAX);
689 tokens >= self.backend.context_window()
690 }
691}
692
693pub struct ToJsonlMsg;
697
698impl<B: ContextBackend> Message<ToJsonlMsg> for AgentContext<B> {
699 type Reply = Result<String, AgentError>;
700
701 async fn handle(
702 &mut self,
703 _msg: ToJsonlMsg,
704 _ctx: &mut Context<Self, Self::Reply>,
705 ) -> Self::Reply {
706 let lines: Vec<String> = self
707 .immutable
708 .iter()
709 .chain(self.compressed.iter())
710 .chain(self.incremental.iter())
711 .map(|m| self.backend.message_to_jsonl(m))
712 .collect::<Result<_, _>>()?;
713 Ok(lines.join("\n"))
714 }
715}
716
717pub struct FromJsonlMsg {
722 pub jsonl: String,
724}
725
726impl<B: ContextBackend> Message<FromJsonlMsg> for AgentContext<B> {
727 type Reply = Result<(), AgentError>;
728
729 async fn handle(
730 &mut self,
731 msg: FromJsonlMsg,
732 _ctx: &mut Context<Self, Self::Reply>,
733 ) -> Self::Reply {
734 for line in msg.jsonl.lines() {
735 let line = line.trim();
736 if line.is_empty() {
737 continue;
738 }
739 let message: B::Message = self.backend.message_from_jsonl(line)?;
740 self.incremental.push(message.clone());
741 if let Some(ref cb) = self.on_change {
742 cb(ChangeEvent::Appended(message));
743 }
744 }
745 Ok(())
746 }
747}