Skip to main content

agent_context/context/
actor.rs

1//! [`AgentContext`] Actor 及所有 [`Message`] 实现。
2//!
3//! 管理三区消息模型的状态,提供消息增删改查、模型对话、上下文压缩等操作。
4
5use std::sync::Arc;
6
7use kameo::prelude::*;
8
9use super::event::{ChangeEvent, CompressStrategy};
10use super::stream::AgentSendStream;
11use super::types::{ContextBackend, ScratchOpts};
12use crate::error::AgentError;
13use crate::message::ContextMessage;
14use crate::readonly::ReadOnly;
15use crate::role::Role;
16
17// ---------------------------------------------------------------------------
18// AgentContext Actor
19// ---------------------------------------------------------------------------
20
21/// LLM 对话上下文管理器,kameo Actor。
22///
23/// 管理三区 + Scratch 消息模型(immutable → compressed → incremental → scratch),提供:
24/// - 消息增删改查([`AppendMsg`]、[`UpdateMsg`]、[`RemoveMsg`] 等)
25/// - 对话发送([`SendMsg`]、[`SendStreamMsg`]),支持通过 [`ScratchOpts`] 追加临时元数据
26/// - 上下文压缩([`CompressMsg`])
27/// - Token 估算和溢出检测([`EstimateTokensMsg`]、[`IsFullMsg`])
28/// - 变更回调([`ChangeEvent`])
29///
30/// ## 构造
31///
32/// ```ignore
33/// let ctx = AgentContext::new(backend, vec![])
34///     .with_on_change(|event| { /* 处理变更 */ });
35/// let actor = AgentContext::spawn(ctx);
36/// ```
37#[derive(Actor)]
38pub struct AgentContext<B: ContextBackend> {
39    backend: B,
40    immutable: ReadOnly<B::Message>,
41    compressed: Vec<B::Message>,
42    incremental: Vec<B::Message>,
43    #[expect(clippy::type_complexity, reason = "回调类型不可避免复杂")]
44    on_change: Option<Arc<dyn Fn(ChangeEvent<B::Message>) + Send + Sync>>,
45    #[expect(clippy::type_complexity, reason = "回调类型不可避免复杂")]
46    on_compressed: Option<
47        Arc<
48            dyn Fn(Vec<B::Message>, Vec<B::Message>) -> (Vec<B::Message>, Vec<B::Message>)
49                + Send
50                + Sync,
51        >,
52    >,
53}
54
55impl<B: ContextBackend> AgentContext<B> {
56    /// 创建新的上下文管理器。
57    ///
58    /// - `backend`: 实现了 [`ContextBackend`] 的 LLM 后端实例
59    /// - `immutable`: 初始不可变消息(系统提示词等),放入 immutable 区
60    pub fn new(backend: B, immutable: Vec<B::Message>) -> Self {
61        Self {
62            backend,
63            immutable: ReadOnly::from(immutable),
64            compressed: Vec::new(),
65            incremental: Vec::new(),
66            on_change: None,
67            on_compressed: None,
68        }
69    }
70
71    /// 注册增量区变更回调。
72    ///
73    /// 每次对 incremental 区的写操作(追加/更新/插入/移除/清空等)都会触发此回调。
74    /// 用于 CLI 实时展示、日志记录等场景。
75    pub fn with_on_change(
76        mut self,
77        f: impl Fn(ChangeEvent<B::Message>) + Send + Sync + 'static,
78    ) -> Self {
79        self.on_change = Some(Arc::new(f));
80        self
81    }
82
83    /// 注册压缩后处理回调。
84    ///
85    /// 在 [`CompressMsg`] 生成摘要后、写入 compressed 区之前调用。
86    /// 回调接收 `(摘要消息列表, 保留消息列表)`,返回 `(最终摘要, 最终保留)`。
87    /// 用于自定义后处理(如过滤、重新排序摘要内容)。
88    pub fn with_on_compressed(
89        mut self,
90        f: impl Fn(Vec<B::Message>, Vec<B::Message>) -> (Vec<B::Message>, Vec<B::Message>)
91        + Send
92        + Sync
93        + 'static,
94    ) -> Self {
95        self.on_compressed = Some(Arc::new(f));
96        self
97    }
98
99    fn default_summary_prompt() -> String {
100        "请将以下对话历史压缩为简洁摘要,保留关键信息、决策和上下文。输出一条 system 消息。"
101            .to_string()
102    }
103}
104
105// ---------------------------------------------------------------------------
106// Actor Messages
107// ---------------------------------------------------------------------------
108
109/// 追加一条消息到 incremental 区末尾。Reply = `()`。
110///
111/// 触发 [`ChangeEvent::Appended`] 回调。
112pub struct AppendMsg<M> {
113    /// 要追加的消息
114    pub message: M,
115}
116
117impl<B: ContextBackend> Message<AppendMsg<B::Message>> for AgentContext<B> {
118    type Reply = ();
119
120    async fn handle(
121        &mut self,
122        msg: AppendMsg<B::Message>,
123        _ctx: &mut Context<Self, Self::Reply>,
124    ) -> Self::Reply {
125        self.incremental.push(msg.message);
126        if let Some(cb) = &self.on_change
127            && let Some(last) = self.incremental.last().cloned()
128        {
129            cb(ChangeEvent::Appended(last));
130        }
131    }
132}
133
134/// 获取三区总消息数。Reply = `usize`。
135pub struct Len;
136
137impl<B: ContextBackend> Message<Len> for AgentContext<B> {
138    type Reply = usize;
139
140    async fn handle(&mut self, _msg: Len, _ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
141        self.immutable.len() + self.compressed.len() + self.incremental.len()
142    }
143}
144
145/// 检查三区是否全部为空。Reply = `bool`。
146pub struct IsEmpty;
147
148impl<B: ContextBackend> Message<IsEmpty> for AgentContext<B> {
149    type Reply = bool;
150
151    async fn handle(
152        &mut self,
153        _msg: IsEmpty,
154        _ctx: &mut Context<Self, Self::Reply>,
155    ) -> Self::Reply {
156        self.immutable.is_empty() && self.compressed.is_empty() && self.incremental.is_empty()
157    }
158}
159
160/// 批量追加消息到 incremental 区。Reply = `()`。
161///
162/// 每条消息单独触发 [`ChangeEvent::Appended`]。
163pub struct ExtendMsg<M> {
164    /// 要批量追加的消息列表
165    pub messages: Vec<M>,
166}
167
168impl<B: ContextBackend> Message<ExtendMsg<B::Message>> for AgentContext<B> {
169    type Reply = ();
170
171    async fn handle(
172        &mut self,
173        msg: ExtendMsg<B::Message>,
174        _ctx: &mut Context<Self, Self::Reply>,
175    ) -> Self::Reply {
176        for m in msg.messages {
177            self.incremental.push(m);
178            if let Some(cb) = &self.on_change
179                && let Some(last) = self.incremental.last().cloned()
180            {
181                cb(ChangeEvent::Appended(last));
182            }
183        }
184    }
185}
186
187/// 静默追加消息,不触发 [`ChangeEvent`] 回调。Reply = `()`。
188///
189/// 用于 [`AgentSendStream`] Drop 时自动存储消息,避免二次通知。
190pub struct SilentAppendMsg<M> {
191    /// 要静默追加的消息
192    pub message: M,
193}
194
195impl<B: ContextBackend> Message<SilentAppendMsg<B::Message>> for AgentContext<B> {
196    type Reply = ();
197
198    async fn handle(
199        &mut self,
200        msg: SilentAppendMsg<B::Message>,
201        _ctx: &mut Context<Self, Self::Reply>,
202    ) -> Self::Reply {
203        self.incremental.push(msg.message);
204    }
205}
206
207/// 按全局索引获取消息。Reply = `Option<Message>`。
208///
209/// 索引按 immutable → compressed → incremental 顺序计算。
210/// 越界返回 `None`。
211pub struct Get(pub usize);
212
213impl<B: ContextBackend> Message<Get> for AgentContext<B> {
214    type Reply = Option<B::Message>;
215
216    async fn handle(&mut self, msg: Get, _ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
217        let idx = msg.0;
218        let imm_len = self.immutable.len();
219        let comp_len = self.compressed.len();
220        if idx < imm_len {
221            Some(self.immutable[idx].clone())
222        } else if idx < imm_len + comp_len {
223            Some(self.compressed[idx - imm_len].clone())
224        } else if idx < imm_len + comp_len + self.incremental.len() {
225            Some(self.incremental[idx - imm_len - comp_len].clone())
226        } else {
227            None
228        }
229    }
230}
231
232/// 获取三区全部消息的拼接结果。Reply = `Vec<Message>`。
233///
234/// 顺序:immutable → compressed → incremental。
235pub struct MessagesMsg;
236
237impl<B: ContextBackend> Message<MessagesMsg> for AgentContext<B> {
238    type Reply = Vec<B::Message>;
239
240    async fn handle(
241        &mut self,
242        _msg: MessagesMsg,
243        _ctx: &mut Context<Self, Self::Reply>,
244    ) -> Self::Reply {
245        self.immutable
246            .iter()
247            .chain(self.compressed.iter())
248            .chain(self.incremental.iter())
249            .cloned()
250            .collect()
251    }
252}
253
254/// 获取 immutable 区的消息副本。Reply = `Vec<Message>`。
255pub struct ImmutableMsg;
256
257impl<B: ContextBackend> Message<ImmutableMsg> for AgentContext<B> {
258    type Reply = Vec<B::Message>;
259
260    async fn handle(
261        &mut self,
262        _msg: ImmutableMsg,
263        _ctx: &mut Context<Self, Self::Reply>,
264    ) -> Self::Reply {
265        self.immutable.to_vec()
266    }
267}
268
269/// 获取 compressed 区的消息副本。Reply = `Vec<Message>`。
270pub struct CompressedMsg;
271
272impl<B: ContextBackend> Message<CompressedMsg> for AgentContext<B> {
273    type Reply = Vec<B::Message>;
274
275    async fn handle(
276        &mut self,
277        _msg: CompressedMsg,
278        _ctx: &mut Context<Self, Self::Reply>,
279    ) -> Self::Reply {
280        self.compressed.clone()
281    }
282}
283
284/// 获取 incremental 区的消息副本。Reply = `Vec<Message>`。
285pub struct IncrementalMsg;
286
287impl<B: ContextBackend> Message<IncrementalMsg> for AgentContext<B> {
288    type Reply = Vec<B::Message>;
289
290    async fn handle(
291        &mut self,
292        _msg: IncrementalMsg,
293        _ctx: &mut Context<Self, Self::Reply>,
294    ) -> Self::Reply {
295        self.incremental.clone()
296    }
297}
298
299/// 按角色筛选三区全部消息。Reply = `Vec<Message>`。
300pub struct FindByRoleMsg(pub Role);
301
302impl<B: ContextBackend> Message<FindByRoleMsg> for AgentContext<B> {
303    type Reply = Vec<B::Message>;
304
305    async fn handle(
306        &mut self,
307        msg: FindByRoleMsg,
308        _ctx: &mut Context<Self, Self::Reply>,
309    ) -> Self::Reply {
310        self.immutable
311            .iter()
312            .chain(self.compressed.iter())
313            .chain(self.incremental.iter())
314            .filter(|m| m.role() == msg.0)
315            .cloned()
316            .collect()
317    }
318}
319
320/// 替换 incremental 区指定索引的消息。Reply = `Result<(), AgentError>`。
321///
322/// 仅对 incremental 区有效。触发 [`ChangeEvent::Updated`]。
323pub struct UpdateMsg<M> {
324    /// incremental 区索引
325    pub index: usize,
326    /// 新消息
327    pub message: M,
328}
329
330impl<B: ContextBackend> Message<UpdateMsg<B::Message>> for AgentContext<B> {
331    type Reply = Result<(), AgentError>;
332
333    async fn handle(
334        &mut self,
335        msg: UpdateMsg<B::Message>,
336        _ctx: &mut Context<Self, Self::Reply>,
337    ) -> Self::Reply {
338        if msg.index >= self.incremental.len() {
339            return Err(AgentError::Context("索引越界".into()));
340        }
341        let old = std::mem::replace(&mut self.incremental[msg.index], msg.message);
342        if let Some(cb) = &self.on_change {
343            cb(ChangeEvent::Updated {
344                index: msg.index,
345                old,
346                new: self.incremental[msg.index].clone(),
347            });
348        }
349        Ok(())
350    }
351}
352
353/// 在 incremental 区指定索引插入消息。Reply = `Result<(), AgentError>`。
354///
355/// 触发 [`ChangeEvent::Inserted`]。
356pub struct InsertMsg<M> {
357    /// incremental 区插入位置
358    pub index: usize,
359    /// 要插入的消息
360    pub message: M,
361}
362
363impl<B: ContextBackend> Message<InsertMsg<B::Message>> for AgentContext<B> {
364    type Reply = Result<(), AgentError>;
365
366    async fn handle(
367        &mut self,
368        msg: InsertMsg<B::Message>,
369        _ctx: &mut Context<Self, Self::Reply>,
370    ) -> Self::Reply {
371        if msg.index > self.incremental.len() {
372            return Err(AgentError::Context("索引越界".into()));
373        }
374        self.incremental.insert(msg.index, msg.message);
375        if let Some(cb) = &self.on_change {
376            cb(ChangeEvent::Inserted {
377                index: msg.index,
378                message: self.incremental[msg.index].clone(),
379            });
380        }
381        Ok(())
382    }
383}
384
385/// 移除 incremental 区指定索引的消息。Reply = `Result<(), AgentError>`。
386///
387/// 触发 [`ChangeEvent::Removed`]。
388pub struct RemoveMsg {
389    /// incremental 区索引
390    pub index: usize,
391}
392
393impl<B: ContextBackend> Message<RemoveMsg> for AgentContext<B> {
394    type Reply = Result<(), AgentError>;
395
396    async fn handle(
397        &mut self,
398        msg: RemoveMsg,
399        _ctx: &mut Context<Self, Self::Reply>,
400    ) -> Self::Reply {
401        if msg.index >= self.incremental.len() {
402            return Err(AgentError::Context("索引越界".into()));
403        }
404        let removed = self.incremental.remove(msg.index);
405        if let Some(cb) = &self.on_change {
406            cb(ChangeEvent::Removed {
407                index: msg.index,
408                message: removed,
409            });
410        }
411        Ok(())
412    }
413}
414
415/// 弹出 incremental 区最后一条消息。Reply = `Option<Message>`。
416///
417/// 触发 [`ChangeEvent::Popped`]。
418pub struct PopMsg;
419
420impl<B: ContextBackend> Message<PopMsg> for AgentContext<B> {
421    type Reply = Option<B::Message>;
422
423    async fn handle(&mut self, _msg: PopMsg, _ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
424        let popped = self.incremental.pop();
425        if let Some(ref msg) = popped
426            && let Some(cb) = &self.on_change
427        {
428            cb(ChangeEvent::Popped(msg.clone()));
429        }
430        popped
431    }
432}
433
434/// 按角色保留 incremental 区消息,其余移除。Reply = `()`。
435///
436/// 触发 [`ChangeEvent::Retained`]。
437pub struct RetainMsg {
438    /// 要保留的角色
439    pub role: Role,
440}
441
442impl<B: ContextBackend> Message<RetainMsg> for AgentContext<B> {
443    type Reply = ();
444
445    async fn handle(
446        &mut self,
447        msg: RetainMsg,
448        _ctx: &mut Context<Self, Self::Reply>,
449    ) -> Self::Reply {
450        let mut removed = Vec::new();
451        let role = msg.role;
452        self.incremental.retain(|m| {
453            if m.role() == role {
454                true
455            } else {
456                removed.push(m.clone());
457                false
458            }
459        });
460        if let Some(cb) = &self.on_change {
461            cb(ChangeEvent::Retained { role, removed });
462        }
463    }
464}
465
466/// 清空整个 incremental 区。Reply = `()`。
467///
468/// 触发 [`ChangeEvent::Cleared`]。
469pub struct ClearMsg;
470
471impl<B: ContextBackend> Message<ClearMsg> for AgentContext<B> {
472    type Reply = ();
473
474    async fn handle(
475        &mut self,
476        _msg: ClearMsg,
477        _ctx: &mut Context<Self, Self::Reply>,
478    ) -> Self::Reply {
479        if !self.incremental.is_empty() {
480            let removed = std::mem::take(&mut self.incremental);
481            if let Some(cb) = &self.on_change {
482                cb(ChangeEvent::Cleared { removed });
483            }
484        }
485    }
486}
487
488/// 触发上下文压缩。Reply = `()`。
489///
490/// 根据 [`CompressStrategy`] 对 incremental 区执行压缩。
491/// 摘要由后端 LLM 生成,通过 `opts` 传递请求选项。
492pub struct CompressMsg<O> {
493    /// 压缩策略
494    pub strategy: CompressStrategy,
495    /// 传递给后端 `send()` 的请求选项
496    pub opts: O,
497}
498
499impl<B: ContextBackend> Message<CompressMsg<B::Opts>> for AgentContext<B> {
500    type Reply = ();
501
502    async fn handle(
503        &mut self,
504        msg: CompressMsg<B::Opts>,
505        _ctx: &mut Context<Self, Self::Reply>,
506    ) -> Self::Reply {
507        match msg.strategy {
508            CompressStrategy::Summarize { keep, prompt } => {
509                let total = self.incremental.len();
510                if total > keep {
511                    let split = total - keep;
512                    let to_summarize: Vec<B::Message> = self.incremental.drain(..split).collect();
513                    if !to_summarize.is_empty() {
514                        let summary_prompt = prompt.unwrap_or_else(Self::default_summary_prompt);
515                        let mut summary_messages =
516                            vec![self.backend.system_message(summary_prompt)];
517                        summary_messages.append(&mut self.compressed);
518                        summary_messages.extend(to_summarize);
519                        let result = self.backend.send(&summary_messages, &msg.opts).await;
520                        if let Ok(response) = result {
521                            if let Ok(raw_msgs) =
522                                self.backend.extract_messages_from_backend_response(
523                                    std::slice::from_ref(&response),
524                                )
525                            {
526                                if let Ok(request_msgs) = self.backend.to_request_messages(raw_msgs)
527                                {
528                                    let summary: Vec<B::Message> = request_msgs
529                                        .into_iter()
530                                        .map(|msg| self.backend.to_system_message(msg))
531                                        .collect();
532                                    let kept: Vec<B::Message> =
533                                        self.incremental.drain(..).collect();
534                                    let (final_summary, final_kept) =
535                                        if let Some(cb) = &self.on_compressed {
536                                            cb(summary, kept)
537                                        } else {
538                                            (summary, kept)
539                                        };
540                                    self.compressed = final_summary;
541                                    self.incremental = final_kept;
542                                } else {
543                                    log::warn!("压缩摘要转换请求格式失败,已跳过");
544                                }
545                            } else {
546                                log::warn!("压缩摘要提取消息失败,已跳过");
547                            }
548                        }
549                    }
550                }
551            }
552        }
553    }
554}
555
556/// 非流式对话。Reply = `Result<Response, AgentError>`。
557///
558/// 将三区消息拼接后,如 opts 实现了 [`ScratchOpts`] 且 `scratch()` 返回 `Some`,
559/// 追加一条 system 消息作为 Scratch,然后发送给后端 LLM。
560/// 成功后自动将响应消息存入 incremental 区,触发 [`ChangeEvent::Appended`]。
561pub struct SendMsg<O> {
562    /// 传递给后端 `send()` 的请求选项(需实现 [`ScratchOpts`])
563    pub opts: O,
564}
565
566impl<B: ContextBackend> Message<SendMsg<B::Opts>> for AgentContext<B> {
567    type Reply = Result<B::Response, AgentError>;
568
569    async fn handle(
570        &mut self,
571        msg: SendMsg<B::Opts>,
572        _ctx: &mut Context<Self, Self::Reply>,
573    ) -> Self::Reply {
574        let scratch = msg.opts.scratch().map(|s| s.to_string());
575        let mut all_messages: Vec<B::Message> = self
576            .immutable
577            .iter()
578            .chain(self.compressed.iter())
579            .chain(self.incremental.iter())
580            .cloned()
581            .collect();
582        if let Some(content) = scratch {
583            all_messages.push(self.backend.system_message(content));
584        }
585        let response = self.backend.send(&all_messages, &msg.opts).await?;
586        let raw_msgs = self
587            .backend
588            .extract_messages_from_backend_response(std::slice::from_ref(&response))?;
589        let request_msgs = self.backend.to_request_messages(raw_msgs)?;
590        for msg in &request_msgs {
591            self.incremental.push(msg.clone());
592            if let Some(cb) = &self.on_change {
593                cb(ChangeEvent::Appended(msg.clone()));
594            }
595        }
596        Ok(response)
597    }
598}
599
600/// 流式对话。Reply = [`AgentSendStream<B>`]。
601///
602/// 将三区消息拼接后,如 opts 实现了 [`ScratchOpts`] 且 `scratch()` 返回 `Some`,
603/// 追加一条 system 消息作为 Scratch,然后发送给后端 LLM。
604/// 调用者消费流直到结束,drop 时自动将响应消息存入 incremental 区。
605pub struct SendStreamMsg<O> {
606    /// 传递给后端 `send_stream()` 的请求选项(需实现 [`ScratchOpts`])
607    pub opts: O,
608}
609
610impl<B: ContextBackend + Clone> Message<SendStreamMsg<B::Opts>> for AgentContext<B> {
611    type Reply = AgentSendStream<B>;
612
613    async fn handle(
614        &mut self,
615        msg: SendStreamMsg<B::Opts>,
616        ctx: &mut Context<Self, Self::Reply>,
617    ) -> Self::Reply {
618        let scratch = msg.opts.scratch().map(|s| s.to_string());
619        let mut all_messages: Vec<B::Message> = self
620            .immutable
621            .iter()
622            .chain(self.compressed.iter())
623            .chain(self.incremental.iter())
624            .cloned()
625            .collect();
626        if let Some(content) = scratch {
627            all_messages.push(self.backend.system_message(content));
628        }
629        let stream = self.backend.send_stream(all_messages, msg.opts);
630        AgentSendStream::new(
631            self.backend.clone(),
632            stream,
633            ctx.actor_ref().clone(),
634            self.on_change.clone(),
635        )
636    }
637}
638
639/// 估算三区全部消息的 token 数量。Reply = `usize`。
640///
641/// 委托给后端的 [`estimate_tokens`](ContextBackend::estimate_tokens)。
642/// 失败时降级返回 0。
643pub struct EstimateTokensMsg;
644
645impl<B: ContextBackend> Message<EstimateTokensMsg> for AgentContext<B> {
646    type Reply = usize;
647
648    async fn handle(
649        &mut self,
650        _msg: EstimateTokensMsg,
651        _ctx: &mut Context<Self, Self::Reply>,
652    ) -> Self::Reply {
653        let all: Vec<B::Message> = self
654            .immutable
655            .iter()
656            .chain(self.compressed.iter())
657            .chain(self.incremental.iter())
658            .cloned()
659            .collect();
660        self.backend.estimate_tokens(&all).await.unwrap_or(0)
661    }
662}
663
664/// 检查上下文是否已满(token 数 >= [`context_window`](ContextBackend::context_window))。Reply = `bool`。
665///
666/// 估算失败时降级返回 `true`(安全策略:宁可误判已满,不可溢出)。
667pub struct IsFullMsg;
668
669impl<B: ContextBackend> Message<IsFullMsg> for AgentContext<B> {
670    type Reply = bool;
671
672    async fn handle(
673        &mut self,
674        _msg: IsFullMsg,
675        _ctx: &mut Context<Self, Self::Reply>,
676    ) -> Self::Reply {
677        let all: Vec<B::Message> = self
678            .immutable
679            .iter()
680            .chain(self.compressed.iter())
681            .chain(self.incremental.iter())
682            .cloned()
683            .collect();
684        let tokens = self
685            .backend
686            .estimate_tokens(&all)
687            .await
688            .unwrap_or(usize::MAX);
689        tokens >= self.backend.context_window()
690    }
691}
692
693/// 将三区全部消息导出为 JSONL 字符串,每行一条 JSON。Reply = `Result<String, AgentError>`。
694///
695/// 消息按 immutable → compressed → incremental 顺序输出。
696pub struct ToJsonlMsg;
697
698impl<B: ContextBackend> Message<ToJsonlMsg> for AgentContext<B> {
699    type Reply = Result<String, AgentError>;
700
701    async fn handle(
702        &mut self,
703        _msg: ToJsonlMsg,
704        _ctx: &mut Context<Self, Self::Reply>,
705    ) -> Self::Reply {
706        let lines: Vec<String> = self
707            .immutable
708            .iter()
709            .chain(self.compressed.iter())
710            .chain(self.incremental.iter())
711            .map(|m| self.backend.message_to_jsonl(m))
712            .collect::<Result<_, _>>()?;
713        Ok(lines.join("\n"))
714    }
715}
716
717/// 从 JSONL 字符串加载消息到 incremental 区。Reply = `Result<(), AgentError>`。
718///
719/// 每行一条 JSON,空行跳过。解析失败或触发 `preserve_reasoning` 时返回错误。
720/// 加载的消息逐条触发 [`ChangeEvent::Appended`] 回调。
721pub struct FromJsonlMsg {
722    /// JSONL 字符串,每行一条消息
723    pub jsonl: String,
724}
725
726impl<B: ContextBackend> Message<FromJsonlMsg> for AgentContext<B> {
727    type Reply = Result<(), AgentError>;
728
729    async fn handle(
730        &mut self,
731        msg: FromJsonlMsg,
732        _ctx: &mut Context<Self, Self::Reply>,
733    ) -> Self::Reply {
734        for line in msg.jsonl.lines() {
735            let line = line.trim();
736            if line.is_empty() {
737                continue;
738            }
739            let message: B::Message = self.backend.message_from_jsonl(line)?;
740            self.incremental.push(message.clone());
741            if let Some(ref cb) = self.on_change {
742                cb(ChangeEvent::Appended(message));
743            }
744        }
745        Ok(())
746    }
747}