codetether_agent/autochat/
shared_context.rs1use crate::bus::{BusHandle, BusMessage};
4use crate::provider::Provider;
5use crate::rlm::{FinalPayload, RlmExecutor};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::sync::Arc;
8
9const CONTEXT_KEY_PREFIX: &str = "relay.";
10const CONTEXT_BUCKET_LIMIT: usize = 12;
11
12#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
13pub struct ContextDelta {
14 #[serde(default)]
15 pub facts: Vec<String>,
16 #[serde(default)]
17 pub decisions: Vec<String>,
18 #[serde(default)]
19 pub risks: Vec<String>,
20 #[serde(default)]
21 pub next_actions: Vec<String>,
22 #[serde(default)]
23 pub evidence: Vec<String>,
24}
25
26#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
27pub struct SharedRelayContext {
28 #[serde(default)]
29 pub facts: Vec<String>,
30 #[serde(default)]
31 pub decisions: Vec<String>,
32 #[serde(default)]
33 pub risks: Vec<String>,
34 #[serde(default)]
35 pub next_actions: Vec<String>,
36 #[serde(default)]
37 pub evidence: Vec<String>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ContextDeltaEnvelope {
42 pub relay_id: String,
43 pub from_agent: String,
44 pub round: usize,
45 pub turn: usize,
46 pub delta: ContextDelta,
47}
48
49impl SharedRelayContext {
50 pub fn merge_delta(&mut self, delta: &ContextDelta) {
51 merge_bucket(&mut self.facts, &delta.facts);
52 merge_bucket(&mut self.decisions, &delta.decisions);
53 merge_bucket(&mut self.risks, &delta.risks);
54 merge_bucket(&mut self.next_actions, &delta.next_actions);
55 merge_bucket(&mut self.evidence, &delta.evidence);
56 }
57
58 pub fn is_empty(&self) -> bool {
59 self.facts.is_empty()
60 && self.decisions.is_empty()
61 && self.risks.is_empty()
62 && self.next_actions.is_empty()
63 && self.evidence.is_empty()
64 }
65
66 pub fn item_count(&self) -> usize {
67 self.facts.len()
68 + self.decisions.len()
69 + self.risks.len()
70 + self.next_actions.len()
71 + self.evidence.len()
72 }
73
74 pub fn render_prompt_snapshot(&self) -> String {
75 if self.is_empty() {
76 return String::new();
77 }
78
79 let mut lines = Vec::new();
80 push_section(&mut lines, "Facts", &self.facts);
81 push_section(&mut lines, "Decisions", &self.decisions);
82 push_section(&mut lines, "Risks", &self.risks);
83 push_section(&mut lines, "Next Actions", &self.next_actions);
84 push_section(&mut lines, "Evidence", &self.evidence);
85 lines.join("\n")
86 }
87}
88
89pub fn context_result_key(relay_id: &str, turn: usize) -> String {
90 format!("{CONTEXT_KEY_PREFIX}{relay_id}.context.{turn:04}")
91}
92
93pub fn context_result_prefix(relay_id: &str) -> String {
94 format!("{CONTEXT_KEY_PREFIX}{relay_id}.context.")
95}
96
97pub fn publish_context_delta(
98 handle: &BusHandle,
99 relay_id: &str,
100 from_agent: &str,
101 round: usize,
102 turn: usize,
103 delta: &ContextDelta,
104) -> usize {
105 let envelope = ContextDeltaEnvelope {
106 relay_id: relay_id.to_string(),
107 from_agent: from_agent.to_string(),
108 round,
109 turn,
110 delta: delta.clone(),
111 };
112
113 let value = serde_json::to_value(&envelope).unwrap_or(serde_json::json!({}));
114 let tags = vec![
115 "relay-context".to_string(),
116 format!("relay:{relay_id}"),
117 format!("agent:{from_agent}"),
118 ];
119 handle.publish_shared_result(context_result_key(relay_id, turn), value, tags)
120}
121
122pub fn drain_context_updates(
123 receiver: &mut BusHandle,
124 relay_id: &str,
125 shared: &mut SharedRelayContext,
126) -> usize {
127 let topic_prefix = format!("results.{}", context_result_prefix(relay_id));
128 let key_prefix = context_result_prefix(relay_id);
129 let mut merged = 0usize;
130
131 while let Some(envelope) = receiver.try_recv() {
132 if !envelope.topic.starts_with(&topic_prefix) {
133 continue;
134 }
135
136 let BusMessage::SharedResult { key, value, .. } = envelope.message else {
137 continue;
138 };
139 if !key.starts_with(&key_prefix) {
140 continue;
141 }
142
143 if let Ok(payload) = serde_json::from_value::<ContextDeltaEnvelope>(value.clone()) {
144 if payload.relay_id == relay_id {
145 shared.merge_delta(&payload.delta);
146 merged += 1;
147 }
148 continue;
149 }
150
151 if let Ok(delta) = serde_json::from_value::<ContextDelta>(value) {
152 shared.merge_delta(&delta);
153 merged += 1;
154 }
155 }
156
157 merged
158}
159
160pub fn compose_prompt_with_context(handoff: &str, shared: &SharedRelayContext) -> String {
161 let snapshot = shared.render_prompt_snapshot();
162 if snapshot.is_empty() {
163 return handoff.to_string();
164 }
165
166 format!(
167 "{handoff}\n\nShared Relay Context (cross-agent memory):\n{snapshot}\n\n\
168Use this shared context as authoritative state across the relay."
169 )
170}
171
172pub async fn distill_context_delta_with_rlm(
173 output: &str,
174 task: &str,
175 from_agent: &str,
176 provider_and_model: Option<(Arc<dyn Provider>, String)>,
177) -> (ContextDelta, bool) {
178 if let Some((provider, model_name)) = provider_and_model {
179 let query = context_distillation_query(task, from_agent);
180 let mut executor =
181 RlmExecutor::new(output.to_string(), provider, model_name).with_max_iterations(2);
182 match executor.analyze(&query).await {
183 Ok(result) => {
184 if let Some(delta) = extract_context_delta_from_text(&result.answer) {
185 return (delta, true);
186 }
187 }
188 Err(err) => {
189 tracing::warn!(
190 error = %err,
191 agent = %from_agent,
192 "RLM context distillation failed; using fallback delta"
193 );
194 }
195 }
196 }
197
198 (fallback_context_delta(output, from_agent), false)
199}
200
201pub fn parse_json_payload<T: DeserializeOwned>(text: &str) -> Option<T> {
202 let trimmed = text.trim();
203 if let Ok(value) = serde_json::from_str::<T>(trimmed) {
204 return Some(value);
205 }
206
207 if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
208 && start < end
209 && let Ok(value) = serde_json::from_str::<T>(&trimmed[start..=end])
210 {
211 return Some(value);
212 }
213
214 None
215}
216
217fn context_distillation_query(task: &str, from_agent: &str) -> String {
218 format!(
219 "Task:\n{task}\n\nAgent turn source: @{from_agent}\n\n\
220Distill this turn into shared relay context.\n\
221Return FINAL(JSON) with this shape:\n\
222{{\"kind\":\"semantic\",\"file\":\"relay_context_delta\",\"answer\":\"{{\\\"facts\\\":[\\\"...\\\"],\\\"decisions\\\":[\\\"...\\\"],\\\"risks\\\":[\\\"...\\\"],\\\"next_actions\\\":[\\\"...\\\"],\\\"evidence\\\":[\\\"...\\\"]}}\"}}\n\
223Rules:\n\
224- answer MUST be valid JSON object encoded as a string\n\
225- 0-3 items per field\n\
226- no markdown"
227 )
228}
229
230fn extract_context_delta_from_text(text: &str) -> Option<ContextDelta> {
231 if let Some(delta) = parse_json_payload::<ContextDelta>(text) {
232 return Some(delta);
233 }
234
235 if let FinalPayload::Semantic(payload) = FinalPayload::parse(text)
236 && let Some(delta) = parse_json_payload::<ContextDelta>(&payload.answer)
237 {
238 return Some(delta);
239 }
240
241 None
242}
243
244fn fallback_context_delta(output: &str, from_agent: &str) -> ContextDelta {
245 let excerpt = truncate_chars(output.trim(), 240);
246 let fact = if excerpt.is_empty() {
247 format!("@{from_agent} produced an empty turn output")
248 } else {
249 format!("@{from_agent}: {excerpt}")
250 };
251
252 ContextDelta {
253 facts: vec![fact],
254 decisions: Vec::new(),
255 risks: Vec::new(),
256 next_actions: vec!["Continue with one concrete implementation step.".to_string()],
257 evidence: vec![format!("output_chars={}", output.chars().count())],
258 }
259}
260
261fn merge_bucket(target: &mut Vec<String>, additions: &[String]) {
262 for item in additions {
263 let normalized = item.trim();
264 if normalized.is_empty() {
265 continue;
266 }
267 if target.iter().any(|existing| existing == normalized) {
268 continue;
269 }
270 target.push(normalized.to_string());
271 if target.len() > CONTEXT_BUCKET_LIMIT {
272 target.remove(0);
273 }
274 }
275}
276
277fn push_section(lines: &mut Vec<String>, title: &str, values: &[String]) {
278 if values.is_empty() {
279 return;
280 }
281 lines.push(format!("{title}:"));
282 lines.extend(values.iter().map(|v| format!("- {v}")));
283}
284
285fn truncate_chars(input: &str, max_chars: usize) -> String {
286 if input.chars().count() <= max_chars {
287 return input.to_string();
288 }
289 input.chars().take(max_chars).collect::<String>() + "..."
290}
291
292#[cfg(test)]
293mod tests {
294 use super::{ContextDelta, SharedRelayContext, parse_json_payload};
295
296 #[test]
297 fn parse_json_payload_extracts_embedded_object() {
298 let parsed = parse_json_payload::<ContextDelta>(
299 "noise {\"facts\":[\"a\"],\"decisions\":[],\"risks\":[],\"next_actions\":[],\"evidence\":[]} trailing",
300 );
301 assert!(parsed.is_some());
302 }
303
304 #[test]
305 fn merge_delta_deduplicates_items() {
306 let mut shared = SharedRelayContext::default();
307 let delta = ContextDelta {
308 facts: vec!["same".to_string(), "same".to_string()],
309 decisions: Vec::new(),
310 risks: Vec::new(),
311 next_actions: Vec::new(),
312 evidence: Vec::new(),
313 };
314 shared.merge_delta(&delta);
315 assert_eq!(shared.facts.len(), 1);
316 }
317
318 #[test]
319 fn render_prompt_snapshot_has_sections() {
320 let mut shared = SharedRelayContext::default();
321 shared.facts.push("f1".to_string());
322 let rendered = shared.render_prompt_snapshot();
323 assert!(rendered.contains("Facts:"));
324 assert!(rendered.contains("- f1"));
325 }
326}