Skip to main content

codetether_agent/autochat/
shared_context.rs

1//! Shared relay context built from structured RLM deltas.
2
3use crate::bus::{BusHandle, BusMessage};
4use crate::provider::Provider;
5use crate::rlm::{FinalPayload, RlmExecutor};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use std::sync::Arc;
8
9const CONTEXT_KEY_PREFIX: &str = "relay.";
10const CONTEXT_BUCKET_LIMIT: usize = 12;
11
12#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
13pub struct ContextDelta {
14    #[serde(default)]
15    pub facts: Vec<String>,
16    #[serde(default)]
17    pub decisions: Vec<String>,
18    #[serde(default)]
19    pub risks: Vec<String>,
20    #[serde(default)]
21    pub next_actions: Vec<String>,
22    #[serde(default)]
23    pub evidence: Vec<String>,
24}
25
26#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
27pub struct SharedRelayContext {
28    #[serde(default)]
29    pub facts: Vec<String>,
30    #[serde(default)]
31    pub decisions: Vec<String>,
32    #[serde(default)]
33    pub risks: Vec<String>,
34    #[serde(default)]
35    pub next_actions: Vec<String>,
36    #[serde(default)]
37    pub evidence: Vec<String>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ContextDeltaEnvelope {
42    pub relay_id: String,
43    pub from_agent: String,
44    pub round: usize,
45    pub turn: usize,
46    pub delta: ContextDelta,
47}
48
49impl SharedRelayContext {
50    pub fn merge_delta(&mut self, delta: &ContextDelta) {
51        merge_bucket(&mut self.facts, &delta.facts);
52        merge_bucket(&mut self.decisions, &delta.decisions);
53        merge_bucket(&mut self.risks, &delta.risks);
54        merge_bucket(&mut self.next_actions, &delta.next_actions);
55        merge_bucket(&mut self.evidence, &delta.evidence);
56    }
57
58    pub fn is_empty(&self) -> bool {
59        self.facts.is_empty()
60            && self.decisions.is_empty()
61            && self.risks.is_empty()
62            && self.next_actions.is_empty()
63            && self.evidence.is_empty()
64    }
65
66    pub fn item_count(&self) -> usize {
67        self.facts.len()
68            + self.decisions.len()
69            + self.risks.len()
70            + self.next_actions.len()
71            + self.evidence.len()
72    }
73
74    pub fn render_prompt_snapshot(&self) -> String {
75        if self.is_empty() {
76            return String::new();
77        }
78
79        let mut lines = Vec::new();
80        push_section(&mut lines, "Facts", &self.facts);
81        push_section(&mut lines, "Decisions", &self.decisions);
82        push_section(&mut lines, "Risks", &self.risks);
83        push_section(&mut lines, "Next Actions", &self.next_actions);
84        push_section(&mut lines, "Evidence", &self.evidence);
85        lines.join("\n")
86    }
87}
88
89pub fn context_result_key(relay_id: &str, turn: usize) -> String {
90    format!("{CONTEXT_KEY_PREFIX}{relay_id}.context.{turn:04}")
91}
92
93pub fn context_result_prefix(relay_id: &str) -> String {
94    format!("{CONTEXT_KEY_PREFIX}{relay_id}.context.")
95}
96
97pub fn publish_context_delta(
98    handle: &BusHandle,
99    relay_id: &str,
100    from_agent: &str,
101    round: usize,
102    turn: usize,
103    delta: &ContextDelta,
104) -> usize {
105    let envelope = ContextDeltaEnvelope {
106        relay_id: relay_id.to_string(),
107        from_agent: from_agent.to_string(),
108        round,
109        turn,
110        delta: delta.clone(),
111    };
112
113    let value = serde_json::to_value(&envelope).unwrap_or(serde_json::json!({}));
114    let tags = vec![
115        "relay-context".to_string(),
116        format!("relay:{relay_id}"),
117        format!("agent:{from_agent}"),
118    ];
119    handle.publish_shared_result(context_result_key(relay_id, turn), value, tags)
120}
121
122pub fn drain_context_updates(
123    receiver: &mut BusHandle,
124    relay_id: &str,
125    shared: &mut SharedRelayContext,
126) -> usize {
127    let topic_prefix = format!("results.{}", context_result_prefix(relay_id));
128    let key_prefix = context_result_prefix(relay_id);
129    let mut merged = 0usize;
130
131    while let Some(envelope) = receiver.try_recv() {
132        if !envelope.topic.starts_with(&topic_prefix) {
133            continue;
134        }
135
136        let BusMessage::SharedResult { key, value, .. } = envelope.message else {
137            continue;
138        };
139        if !key.starts_with(&key_prefix) {
140            continue;
141        }
142
143        if let Ok(payload) = serde_json::from_value::<ContextDeltaEnvelope>(value.clone()) {
144            if payload.relay_id == relay_id {
145                shared.merge_delta(&payload.delta);
146                merged += 1;
147            }
148            continue;
149        }
150
151        if let Ok(delta) = serde_json::from_value::<ContextDelta>(value) {
152            shared.merge_delta(&delta);
153            merged += 1;
154        }
155    }
156
157    merged
158}
159
160pub fn compose_prompt_with_context(handoff: &str, shared: &SharedRelayContext) -> String {
161    let snapshot = shared.render_prompt_snapshot();
162    if snapshot.is_empty() {
163        return handoff.to_string();
164    }
165
166    format!(
167        "{handoff}\n\nShared Relay Context (cross-agent memory):\n{snapshot}\n\n\
168Use this shared context as authoritative state across the relay."
169    )
170}
171
172pub async fn distill_context_delta_with_rlm(
173    output: &str,
174    task: &str,
175    from_agent: &str,
176    provider_and_model: Option<(Arc<dyn Provider>, String)>,
177) -> (ContextDelta, bool) {
178    if let Some((provider, model_name)) = provider_and_model {
179        let query = context_distillation_query(task, from_agent);
180        let mut executor =
181            RlmExecutor::new(output.to_string(), provider, model_name).with_max_iterations(2);
182        match executor.analyze(&query).await {
183            Ok(result) => {
184                if let Some(delta) = extract_context_delta_from_text(&result.answer) {
185                    return (delta, true);
186                }
187            }
188            Err(err) => {
189                tracing::warn!(
190                    error = %err,
191                    agent = %from_agent,
192                    "RLM context distillation failed; using fallback delta"
193                );
194            }
195        }
196    }
197
198    (fallback_context_delta(output, from_agent), false)
199}
200
201pub fn parse_json_payload<T: DeserializeOwned>(text: &str) -> Option<T> {
202    let trimmed = text.trim();
203    if let Ok(value) = serde_json::from_str::<T>(trimmed) {
204        return Some(value);
205    }
206
207    if let (Some(start), Some(end)) = (trimmed.find('{'), trimmed.rfind('}'))
208        && start < end
209        && let Ok(value) = serde_json::from_str::<T>(&trimmed[start..=end])
210    {
211        return Some(value);
212    }
213
214    None
215}
216
217fn context_distillation_query(task: &str, from_agent: &str) -> String {
218    format!(
219        "Task:\n{task}\n\nAgent turn source: @{from_agent}\n\n\
220Distill this turn into shared relay context.\n\
221Return FINAL(JSON) with this shape:\n\
222{{\"kind\":\"semantic\",\"file\":\"relay_context_delta\",\"answer\":\"{{\\\"facts\\\":[\\\"...\\\"],\\\"decisions\\\":[\\\"...\\\"],\\\"risks\\\":[\\\"...\\\"],\\\"next_actions\\\":[\\\"...\\\"],\\\"evidence\\\":[\\\"...\\\"]}}\"}}\n\
223Rules:\n\
224- answer MUST be valid JSON object encoded as a string\n\
225- 0-3 items per field\n\
226- no markdown"
227    )
228}
229
230fn extract_context_delta_from_text(text: &str) -> Option<ContextDelta> {
231    if let Some(delta) = parse_json_payload::<ContextDelta>(text) {
232        return Some(delta);
233    }
234
235    if let FinalPayload::Semantic(payload) = FinalPayload::parse(text)
236        && let Some(delta) = parse_json_payload::<ContextDelta>(&payload.answer)
237    {
238        return Some(delta);
239    }
240
241    None
242}
243
244fn fallback_context_delta(output: &str, from_agent: &str) -> ContextDelta {
245    let excerpt = truncate_chars(output.trim(), 240);
246    let fact = if excerpt.is_empty() {
247        format!("@{from_agent} produced an empty turn output")
248    } else {
249        format!("@{from_agent}: {excerpt}")
250    };
251
252    ContextDelta {
253        facts: vec![fact],
254        decisions: Vec::new(),
255        risks: Vec::new(),
256        next_actions: vec!["Continue with one concrete implementation step.".to_string()],
257        evidence: vec![format!("output_chars={}", output.chars().count())],
258    }
259}
260
261fn merge_bucket(target: &mut Vec<String>, additions: &[String]) {
262    for item in additions {
263        let normalized = item.trim();
264        if normalized.is_empty() {
265            continue;
266        }
267        if target.iter().any(|existing| existing == normalized) {
268            continue;
269        }
270        target.push(normalized.to_string());
271        if target.len() > CONTEXT_BUCKET_LIMIT {
272            target.remove(0);
273        }
274    }
275}
276
277fn push_section(lines: &mut Vec<String>, title: &str, values: &[String]) {
278    if values.is_empty() {
279        return;
280    }
281    lines.push(format!("{title}:"));
282    lines.extend(values.iter().map(|v| format!("- {v}")));
283}
284
285fn truncate_chars(input: &str, max_chars: usize) -> String {
286    if input.chars().count() <= max_chars {
287        return input.to_string();
288    }
289    input.chars().take(max_chars).collect::<String>() + "..."
290}
291
292#[cfg(test)]
293mod tests {
294    use super::{ContextDelta, SharedRelayContext, parse_json_payload};
295
296    #[test]
297    fn parse_json_payload_extracts_embedded_object() {
298        let parsed = parse_json_payload::<ContextDelta>(
299            "noise {\"facts\":[\"a\"],\"decisions\":[],\"risks\":[],\"next_actions\":[],\"evidence\":[]} trailing",
300        );
301        assert!(parsed.is_some());
302    }
303
304    #[test]
305    fn merge_delta_deduplicates_items() {
306        let mut shared = SharedRelayContext::default();
307        let delta = ContextDelta {
308            facts: vec!["same".to_string(), "same".to_string()],
309            decisions: Vec::new(),
310            risks: Vec::new(),
311            next_actions: Vec::new(),
312            evidence: Vec::new(),
313        };
314        shared.merge_delta(&delta);
315        assert_eq!(shared.facts.len(), 1);
316    }
317
318    #[test]
319    fn render_prompt_snapshot_has_sections() {
320        let mut shared = SharedRelayContext::default();
321        shared.facts.push("f1".to_string());
322        let rendered = shared.render_prompt_snapshot();
323        assert!(rendered.contains("Facts:"));
324        assert!(rendered.contains("- f1"));
325    }
326}