use std::sync::{Arc, Mutex};
use crate::agent::Agent;
use crate::channel::Channel;
#[cfg(test)]
pub(super) use zeph_agent_context::summarization::scheduling::parse_subgoal_extraction_response;
impl<C: Channel> Agent<C> {
pub(in crate::agent) async fn maybe_compact(
&mut self,
) -> Result<(), crate::agent::error::AgentError> {
let svc = zeph_agent_context::ContextService::new();
let providers = self.providers();
let status = CollectStatusSink::default();
let turns_before = self.context_manager.turns_since_last_hard_compaction;
let msg_count_before = self.msg.messages.len();
let mut summ = self.summarization_view();
svc.maybe_compact(&mut summ, &providers, &status)
.await
.map_err(|e| crate::agent::error::AgentError::ContextError(format!("{e:#}")))?;
let collected = status.take();
for msg in &collected {
let _ = self.channel.send_status(msg).await;
}
if let Some(ref tx) = self.services.session.status_tx {
for msg in &collected {
let _ = tx.send(msg.clone());
}
}
let msg_count_after = self.msg.messages.len();
let compacted = msg_count_after < msg_count_before;
let hard_fired = self.context_manager.turns_since_last_hard_compaction == Some(0)
&& turns_before != Some(0);
if compacted {
self.update_metrics(|m| m.context_compactions += 1);
}
if hard_fired {
let turns_segment = turns_before.unwrap_or(0);
self.update_metrics(|m| {
m.compaction_hard_count += 1;
if turns_segment > 0 {
m.compaction_turns_after_hard.push(turns_segment);
}
});
}
Ok(())
}
pub(in crate::agent) fn maybe_soft_compact_mid_iteration(&mut self) {
let svc = zeph_agent_context::ContextService::new();
let mut summ = self.summarization_view();
svc.maybe_soft_compact_mid_iteration(&mut summ);
}
pub(in crate::agent) async fn maybe_proactive_compress(
&mut self,
) -> Result<(), crate::agent::error::AgentError> {
let guidelines = self.load_compression_guidelines_for_compact().await;
let svc = zeph_agent_context::ContextService::new();
let providers = self.providers();
let status = TxStatusSink(self.services.session.status_tx.clone());
let mut summ = self
.summarization_view()
.with_compression_guidelines(guidelines);
svc.maybe_proactive_compress(&mut summ, &providers, &status)
.await;
Ok(())
}
pub(in crate::agent) async fn emit_compaction_status_signal(&mut self, tokens_before: u64) {
let tokens_after = self.runtime.providers.cached_prompt_tokens;
if tokens_after < tokens_before {
let now_ms = u64::try_from(
std::time::SystemTime::UNIX_EPOCH
.elapsed()
.unwrap_or_default()
.as_millis(),
)
.unwrap_or(u64::MAX);
tracing::info!(
tokens_before,
tokens_after,
saved = tokens_before.saturating_sub(tokens_after),
"context compaction complete"
);
let _ = self
.channel
.send_status(&format!(
"Compacting: {tokens_before}→{tokens_after} tokens"
))
.await;
self.update_metrics(|m| {
m.compaction_last_before = tokens_before;
m.compaction_last_after = tokens_after;
m.compaction_last_at_ms = now_ms;
});
}
}
}
struct TxStatusSink(Option<tokio::sync::mpsc::UnboundedSender<String>>);
impl zeph_agent_context::StatusSink for TxStatusSink {
fn send_status(&self, msg: &str) -> impl std::future::Future<Output = ()> + Send + '_ {
if let Some(ref tx) = self.0 {
let _ = tx.send(msg.to_owned());
}
std::future::ready(())
}
}
#[derive(Default)]
struct CollectStatusSink(Arc<Mutex<Vec<String>>>);
impl CollectStatusSink {
fn take(&self) -> Vec<String> {
std::mem::take(&mut self.0.lock().unwrap())
}
}
impl zeph_agent_context::StatusSink for CollectStatusSink {
fn send_status(&self, msg: &str) -> impl std::future::Future<Output = ()> + Send + '_ {
self.0.lock().unwrap().push(msg.to_owned());
std::future::ready(())
}
}