Skip to main content

harness_loop/
memory_layer.rs

1//! Long-term-memory wiring for [`crate::AgentLoop`].
2//!
3//! Two pieces, designed to be installed together:
4//!
5//! - [`MemoryGuide`] — at session start, calls [`Memory::recall`] with the
6//!   current task description and pushes the top-K matches into
7//!   `ctx.guides` as plain text. The model sees a "Relevant prior context"
8//!   section in its system prompt before the very first model call.
9//!
10//! - [`MemoryWriter`] — captures every assistant text turn (via `PostModel`)
11//!   and persists the *last* one as a [`MemoryEntry`] when the run finishes
12//!   (`TaskCompleted`). This turns "this conversation produced an answer"
13//!   into "future sessions can recall the answer".
14//!
15//! Both share an `Arc<dyn Memory>` so a single backend serves recall +
16//! write. The trait is async; the writer hook uses `tokio::spawn` to commit
17//! without blocking the loop.
18//!
19//! ## Wiring
20//!
21//! ```ignore
22//! let mem: Arc<dyn Memory> = Arc::new(FileMemory::open("~/.harness/mem.jsonl")?);
23//! let loop_ = AgentLoop::new(model)
24//!     .with_guide(Arc::new(MemoryGuide::new(mem.clone()).with_top_k(5)))
25//!     .with_hook(Arc::new(MemoryWriter::new(mem)));
26//! ```
27
28use async_trait::async_trait;
29use harness_core::{
30    Block, Context, Event, Execution, Guide, GuideError, GuideId, GuideScope, Hook, HookOutcome,
31    Memory, MemoryEntry, Model, Task, Turn, TurnRole, World,
32};
33use std::sync::{Arc, Mutex, OnceLock};
34
35/// Marker prefix used to identify the recall block in `ctx.guides`. We
36/// strip prior recall blocks on each `apply_before_iter` so the injected
37/// list reflects only the LATEST recall — otherwise ctx.guides grows
38/// unboundedly across iterations.
39const MEMORY_RECALL_MARKER: &str = "[memory-recall]\n";
40
41/// Guide that recalls relevant prior memories and injects them into
42/// `ctx.guides` as a `Block::Text` for the model to see.
43///
44/// Two recall points:
45///
46/// - `apply` (session start): one-shot recall using `ctx.task.description`
47///   as the query. Always fires.
48/// - `apply_before_iter` (every model turn): re-recalls using the **last
49///   user message** as query, replacing the previous recall block. Lets
50///   the recall track topic drift mid-session. No-op when there's no user
51///   message in history (the very first iteration uses the `apply` recall).
52///
53/// Filters (chainable builders, post-recall):
54///
55/// - `with_top_k(k)` — number of candidates to fetch from `Memory::recall`.
56///   Default 5.
57/// - `with_min_score(s)` — drop entries whose recomputed normalised
58///   keyword overlap with the query is below `s`. Default 0.0 (no filter).
59///   Score = `(query_tokens ∩ entry_tokens).len() / query_tokens.len()`.
60/// - `with_required_tags(tags)` — drop entries that don't have ALL these
61///   tags. Default empty (no filter).
62/// - `with_excluded_tags(tags)` — drop entries that have ANY of these
63///   tags. Default empty.
64///
65/// When filters are tight, we over-fetch `top_k * 3` candidates so there's
66/// room to drop without starving the output.
67pub struct MemoryGuide {
68    memory: Arc<dyn Memory>,
69    top_k: usize,
70    min_score: f32,
71    required_tags: Vec<String>,
72    excluded_tags: Vec<String>,
73}
74
75static MEMORY_GUIDE_ID: OnceLock<GuideId> = OnceLock::new();
76static MEMORY_GUIDE_SCOPE: OnceLock<GuideScope> = OnceLock::new();
77
78impl MemoryGuide {
79    /// Construct a guide that recalls up to 5 entries per session.
80    pub fn new(memory: Arc<dyn Memory>) -> Self {
81        Self {
82            memory,
83            top_k: 5,
84            min_score: 0.0,
85            required_tags: Vec::new(),
86            excluded_tags: Vec::new(),
87        }
88    }
89
90    /// Override the number of memories recalled per session. Pick small —
91    /// every recalled line spends prompt tokens.
92    pub fn with_top_k(mut self, k: usize) -> Self {
93        self.top_k = k;
94        self
95    }
96
97    /// Drop entries whose recomputed normalised keyword overlap with the
98    /// query is below `s` ∈ [0, 1]. Default 0.0 (= keep all top_k).
99    ///
100    /// Score formula:
101    /// `(distinct query tokens present in entry.content+tags) / (query token count)`
102    ///
103    /// So a query of 4 tokens needs ≥3 to land in the entry to score ≥ 0.75.
104    pub fn with_min_score(mut self, s: f32) -> Self {
105        self.min_score = s.clamp(0.0, 1.0);
106        self
107    }
108
109    /// Only inject entries that have ALL of these tags. Empty = no filter.
110    pub fn with_required_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
111        self.required_tags = tags.into_iter().map(Into::into).collect();
112        self
113    }
114
115    /// Drop entries that have ANY of these tags. Empty = no filter.
116    pub fn with_excluded_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
117        self.excluded_tags = tags.into_iter().map(Into::into).collect();
118        self
119    }
120
121    /// Inner recall + filter + format pass. Returns the formatted block
122    /// text, or `None` if there's nothing to inject.
123    async fn recall_block(&self, query: &str) -> Option<String> {
124        if self.top_k == 0 || query.trim().is_empty() {
125            return None;
126        }
127        // Over-fetch when filters are active so the post-filter has room.
128        let fetch_k = if self.min_score > 0.0
129            || !self.required_tags.is_empty()
130            || !self.excluded_tags.is_empty()
131        {
132            self.top_k.saturating_mul(3).max(self.top_k)
133        } else {
134            self.top_k
135        };
136        let hits = match self.memory.recall(query, fetch_k).await {
137            Ok(v) => v,
138            Err(e) => {
139                tracing::warn!(error = %e, "memory recall failed; proceeding without it");
140                return None;
141            }
142        };
143        let q_tokens = tokenise_for_score(query);
144        let q_len = q_tokens.len().max(1) as f32;
145
146        let mut kept: Vec<&MemoryEntry> = Vec::new();
147        for e in &hits {
148            // Tag filters first — cheaper than re-scoring.
149            if !self.required_tags.is_empty()
150                && !self.required_tags.iter().all(|t| e.tags.iter().any(|x| x == t))
151            {
152                continue;
153            }
154            if !self.excluded_tags.is_empty()
155                && self.excluded_tags.iter().any(|t| e.tags.iter().any(|x| x == t))
156            {
157                continue;
158            }
159            if self.min_score > 0.0 {
160                let score = recompute_score(&q_tokens, e);
161                if (score / q_len) < self.min_score {
162                    continue;
163                }
164            }
165            kept.push(e);
166            if kept.len() >= self.top_k {
167                break;
168            }
169        }
170        if kept.is_empty() {
171            return None;
172        }
173        let mut lines = String::from(MEMORY_RECALL_MARKER);
174        lines.push_str("Relevant prior context (from your long-term memory):");
175        for (i, e) in kept.iter().enumerate() {
176            lines.push_str(&format!("\n  {}. {}", i + 1, e.content.trim()));
177        }
178        Some(lines)
179    }
180
181    fn remove_previous_recall_block(ctx: &mut Context) {
182        ctx.guides.retain(|b| {
183            !matches!(b, Block::Text(t) if t.starts_with(MEMORY_RECALL_MARKER))
184        });
185    }
186}
187
188/// Pull out the most recent user-role text from `ctx.history`. Used by
189/// `apply_before_iter` to drive the per-turn recall query.
190fn last_user_text(ctx: &Context) -> Option<String> {
191    use harness_core::{Block as B, TurnRole};
192    for turn in ctx.history.iter().rev() {
193        if turn.role != TurnRole::User {
194            continue;
195        }
196        for block in turn.blocks.iter().rev() {
197            if let B::Text(t) = block
198                && !t.trim().is_empty()
199            {
200                return Some(t.clone());
201            }
202        }
203    }
204    None
205}
206
207fn tokenise_for_score(s: &str) -> std::collections::HashSet<String> {
208    s.to_lowercase()
209        .split(|c: char| !c.is_alphanumeric())
210        .filter(|t| t.len() >= 3)
211        .map(String::from)
212        .collect()
213}
214
215fn recompute_score(query_tokens: &std::collections::HashSet<String>, entry: &MemoryEntry) -> f32 {
216    let mut hay = entry.content.to_lowercase();
217    if !entry.tags.is_empty() {
218        hay.push(' ');
219        hay.push_str(&entry.tags.join(" ").to_lowercase());
220    }
221    query_tokens
222        .iter()
223        .filter(|t| hay.contains(t.as_str()))
224        .count() as f32
225}
226
227#[async_trait]
228impl Guide for MemoryGuide {
229    fn id(&self) -> &GuideId {
230        MEMORY_GUIDE_ID.get_or_init(|| "memory-recall".into())
231    }
232    fn kind(&self) -> Execution {
233        // The recall *itself* is computational (keyword match / vector
234        // lookup); the model later infers over the result.
235        Execution::Computational
236    }
237    fn scope(&self) -> &GuideScope {
238        MEMORY_GUIDE_SCOPE.get_or_init(|| GuideScope::Always)
239    }
240    async fn apply(&self, ctx: &mut Context, _w: &World) -> Result<(), GuideError> {
241        Self::remove_previous_recall_block(ctx);
242        if let Some(block) = self.recall_block(&ctx.task.description).await {
243            ctx.guides.push(Block::Text(block));
244        }
245        Ok(())
246    }
247    async fn apply_before_iter(
248        &self,
249        ctx: &mut Context,
250        _w: &World,
251    ) -> Result<(), GuideError> {
252        // Query = latest user message; fall back to task.description on
253        // turn 0 (before any user turn lands in history — though the loop
254        // pushes the task as a user turn before iter 0 so this is rare).
255        let query = last_user_text(ctx).unwrap_or_else(|| ctx.task.description.clone());
256        Self::remove_previous_recall_block(ctx);
257        if let Some(block) = self.recall_block(&query).await {
258            ctx.guides.push(Block::Text(block));
259        }
260        Ok(())
261    }
262}
263
264/// Hook that writes the final assistant text of every successful run back
265/// into long-term memory.
266///
267/// Behaviour:
268/// - On every `PostModel`, captures `out.text` into an internal slot.
269/// - On `TaskCompleted`, takes the most recent captured text and writes it
270///   as a `MemoryEntry` tagged with the source (defaults to `"session"`).
271/// - On `SessionEnd` without a `TaskCompleted` (i.e. `BudgetExhausted`),
272///   nothing is written — partial work shouldn't pollute long-term memory.
273pub struct MemoryWriter {
274    memory: Arc<dyn Memory>,
275    last_text: Mutex<Option<String>>,
276    source: String,
277    tags: Vec<String>,
278}
279
280impl MemoryWriter {
281    pub fn new(memory: Arc<dyn Memory>) -> Self {
282        Self {
283            memory,
284            last_text: Mutex::new(None),
285            source: "session".into(),
286            tags: Vec::new(),
287        }
288    }
289
290    /// Tag every persisted memory with the given source name (e.g.
291    /// `"investor-bot"`, `"personal-assistant"`). Useful for multi-app
292    /// memory stores.
293    pub fn with_source(mut self, source: impl Into<String>) -> Self {
294        self.source = source.into();
295        self
296    }
297
298    pub fn with_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
299        self.tags = tags.into_iter().map(Into::into).collect();
300        self
301    }
302}
303
304impl Hook for MemoryWriter {
305    fn name(&self) -> &str {
306        "memory-writer"
307    }
308    fn matches(&self, ev: &Event<'_>) -> bool {
309        matches!(ev, Event::PostModel { .. } | Event::TaskCompleted)
310    }
311    fn fire(&self, ev: &Event<'_>, _w: &mut World) -> HookOutcome {
312        match ev {
313            Event::PostModel { out } => {
314                if let Some(text) = &out.text
315                    && !text.trim().is_empty()
316                    && let Ok(mut slot) = self.last_text.lock()
317                {
318                    *slot = Some(text.clone());
319                }
320            }
321            Event::TaskCompleted => {
322                let Some(text) = self.last_text.lock().ok().and_then(|mut g| g.take()) else {
323                    return HookOutcome::Allow;
324                };
325                let entry = MemoryEntry::new(text)
326                    .with_source(self.source.clone())
327                    .with_tags(self.tags.clone());
328                let mem = self.memory.clone();
329                // Fire-and-forget: we're inside an async loop, so spawning
330                // is safe and avoids blocking the next iteration.
331                tokio::spawn(async move {
332                    if let Err(e) = mem.write(entry).await {
333                        tracing::warn!(error = %e, "memory write failed");
334                    }
335                });
336            }
337            _ => {}
338        }
339        HookOutcome::Allow
340    }
341}
342
343/// Smarter alternative to [`MemoryWriter`] — distil the session's assistant
344/// turns into 1..=`max_facts` atomic durable facts using a cheap
345/// "synthesizer" model, instead of persisting the verbatim final answer.
346///
347/// Wire either `MemoryWriter` **or** `MemorySynthesizer`, not both —
348/// `MemorySynthesizer` is a superset of the writer's behaviour with the
349/// extra distillation step.
350///
351/// Behaviour:
352/// - On `PostModel`, appends `out.text` (when present, non-empty) to an
353///   internal buffer.
354/// - On `TaskCompleted`, `tokio::spawn`s a synthesis task: calls
355///   `synth_model.complete()` with a fixed prompt that asks for a JSON
356///   array of `{content, tags}` objects, parses the response, and writes
357///   each one via `Memory::write`.
358/// - Model errors / parse failures fall back to saving the raw response
359///   as a single entry tagged `"synth-raw"` so the session's information
360///   isn't lost entirely.
361/// - On `BudgetExhausted` (no `TaskCompleted` fires), nothing is written.
362///
363/// The synth model should be cheap (`deepseek-v4-flash`, `gpt-5-nano`, etc.).
364/// Constructed independently from the main model so you can use a small
365/// summariser even when the reasoning model is large.
366pub struct MemorySynthesizer {
367    memory: Arc<dyn Memory>,
368    synth_model: Arc<dyn Model>,
369    transcripts: Mutex<Vec<String>>,
370    source: String,
371    base_tags: Vec<String>,
372    max_facts: usize,
373    /// App-specific instructions prepended to the synth prompt. Used to
374    /// give the model domain context (e.g. "this is a personal accounting
375    /// app — transactions are already stored, don't repeat flows as facts").
376    extra_instructions: Option<String>,
377    // JoinHandles of spawned synthesis tasks. The agent loop's owner can
378    // `await flush_pending()` before exiting to guarantee that synth
379    // completes before the process tears down its tokio runtime.
380    pending: Mutex<Vec<tokio::task::JoinHandle<()>>>,
381}
382
383impl MemorySynthesizer {
384    /// Construct a synthesizer that uses `synth_model` to distil the
385    /// session into at most 3 facts.
386    pub fn new(memory: Arc<dyn Memory>, synth_model: Arc<dyn Model>) -> Self {
387        Self {
388            memory,
389            synth_model,
390            transcripts: Mutex::new(Vec::new()),
391            source: "session".into(),
392            base_tags: Vec::new(),
393            max_facts: 3,
394            extra_instructions: None,
395            pending: Mutex::new(Vec::new()),
396        }
397    }
398
399    /// Prepend domain-specific guidance to the synthesizer's prompt. The
400    /// extra text shows up BEFORE the standard "extract durable facts"
401    /// instructions, so it sets context for what the model should consider
402    /// durable in this application.
403    ///
404    /// Example for a personal-accounting app:
405    /// ```ignore
406    /// .with_extra_instructions(
407    ///   "This is a personal-accounting agent. Transaction flows like \
408    ///    '¥199 火锅 microwave' are stored in the txns table — do NOT \
409    ///    re-store them as facts. ONLY record: stable user preferences \
410    ///    (payment habits, category conventions), repeated behaviour \
411    ///    patterns (≥2 mentions), or long-term decisions (subscription \
412    ///    cadences, investment policies). If unsure, prefer empty []."
413    /// )
414    /// ```
415    pub fn with_extra_instructions(mut self, instructions: impl Into<String>) -> Self {
416        self.extra_instructions = Some(instructions.into());
417        self
418    }
419
420    /// Await all background synthesis tasks that have been kicked off so
421    /// far. Call this before your process exits if you want to guarantee
422    /// the last session's memory is on disk — otherwise the tokio runtime
423    /// may be dropped while the spawn is mid-flight.
424    pub async fn flush_pending(&self) {
425        let handles: Vec<tokio::task::JoinHandle<()>> = match self.pending.lock() {
426            Ok(mut g) => std::mem::take(&mut *g),
427            Err(_) => return,
428        };
429        for h in handles {
430            let _ = h.await;
431        }
432    }
433
434    pub fn with_source(mut self, source: impl Into<String>) -> Self {
435        self.source = source.into();
436        self
437    }
438
439    pub fn with_base_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
440        self.base_tags = tags.into_iter().map(Into::into).collect();
441        self
442    }
443
444    /// Cap how many facts the synthesizer is allowed to emit. Default 3.
445    pub fn with_max_facts(mut self, n: usize) -> Self {
446        self.max_facts = n.max(1);
447        self
448    }
449}
450
451#[derive(serde::Deserialize)]
452struct SynthFact {
453    #[serde(default)]
454    content: String,
455    #[serde(default)]
456    tags: Vec<String>,
457    /// Optional retention hint emitted by the synth model. `None` = keep
458    /// indefinitely (stable preferences, identity). Finite N = expire after
459    /// N days (one-off project state, session-scoped preferences).
460    #[serde(default)]
461    ttl_days: Option<u32>,
462}
463
464/// Best-effort JSON-array extractor: tolerates markdown code fences and
465/// leading/trailing prose around the JSON body.
466fn extract_facts(raw: &str) -> Option<Vec<SynthFact>> {
467    // Strip ```json ... ``` or ``` ... ``` fences if present.
468    let stripped = raw.trim();
469    let body = if let Some(rest) = stripped.strip_prefix("```json") {
470        rest.trim_start_matches('\n')
471            .rsplit_once("```")
472            .map(|(b, _)| b)
473            .unwrap_or(rest)
474    } else if let Some(rest) = stripped.strip_prefix("```") {
475        rest.trim_start_matches('\n')
476            .rsplit_once("```")
477            .map(|(b, _)| b)
478            .unwrap_or(rest)
479    } else {
480        stripped
481    };
482    // Find first '[' and last ']' — JSON array.
483    let start = body.find('[')?;
484    let end = body.rfind(']')?;
485    if end <= start {
486        return None;
487    }
488    serde_json::from_str::<Vec<SynthFact>>(&body[start..=end]).ok()
489}
490
491impl Hook for MemorySynthesizer {
492    fn name(&self) -> &str {
493        "memory-synthesizer"
494    }
495    fn matches(&self, ev: &Event<'_>) -> bool {
496        matches!(ev, Event::PostModel { .. } | Event::TaskCompleted)
497    }
498    fn fire(&self, ev: &Event<'_>, _w: &mut World) -> HookOutcome {
499        match ev {
500            Event::PostModel { out } => {
501                if let Some(text) = &out.text
502                    && !text.trim().is_empty()
503                    && let Ok(mut buf) = self.transcripts.lock()
504                {
505                    buf.push(text.clone());
506                }
507            }
508            Event::TaskCompleted => {
509                let transcript = match self.transcripts.lock() {
510                    Ok(mut g) => std::mem::take(&mut *g).join("\n\n---\n\n"),
511                    Err(_) => return HookOutcome::Allow,
512                };
513                if transcript.trim().is_empty() {
514                    return HookOutcome::Allow;
515                }
516                let mem = self.memory.clone();
517                let model = self.synth_model.clone();
518                let source = self.source.clone();
519                let base_tags = self.base_tags.clone();
520                let max_facts = self.max_facts;
521                let extra = self.extra_instructions.clone();
522                let handle = tokio::spawn(async move {
523                    distil_and_write(mem, model, source, base_tags, max_facts, extra, transcript)
524                        .await;
525                });
526                if let Ok(mut g) = self.pending.lock() {
527                    g.push(handle);
528                }
529            }
530            _ => {}
531        }
532        HookOutcome::Allow
533    }
534}
535
536async fn distil_and_write(
537    memory: Arc<dyn Memory>,
538    model: Arc<dyn Model>,
539    source: String,
540    base_tags: Vec<String>,
541    max_facts: usize,
542    extra_instructions: Option<String>,
543    transcript: String,
544) {
545    let extra_block = match extra_instructions {
546        Some(s) if !s.trim().is_empty() => format!("\n\n[domain context]\n{s}\n"),
547        _ => String::new(),
548    };
549    let prompt = format!(
550        "Below is the assistant's turns from a completed agent session. \
551         Extract 1 to {max_facts} DURABLE FACTS worth remembering for future sessions \
552         (user preferences, decisions made, key findings, learned constraints — NOT \
553         transient details like timestamps or one-off answers).{extra_block} \
554         \n\nReturn ONLY a JSON array (no prose, no markdown fences) where each item is \
555         {{\"content\": \"<one durable fact, 1-2 sentences>\", \"tags\": [\"<keyword>\", ...], \
556         \"ttl_days\": <integer or null>}}. \
557         `ttl_days` controls how long the fact stays in memory: \
558         `null` = permanent (use for stable preferences, identity, long-term decisions); \
559         `7` = one week (current task / sprint scope); \
560         `30`-`180` = project-scope context; \
561         `1` = ephemeral (rarely useful — prefer omitting facts that are this fleeting). \
562         Use 2-5 lowercase keyword tags per fact for retrieval. \
563         If the session produced nothing durable, return [].\
564         \n\n--- SESSION TRANSCRIPT ---\n{transcript}\n--- END TRANSCRIPT ---"
565    );
566
567    let mut ctx = Context::new(Task {
568        description: prompt.clone(),
569        source: None,
570        deadline: None,
571    });
572    ctx.history.push(Turn {
573        role: TurnRole::User,
574        blocks: vec![Block::Text(prompt)],
575    });
576
577    let out = match model.complete(&ctx).await {
578        Ok(o) => o,
579        Err(e) => {
580            tracing::warn!(error = %e, "memory synth model call failed; nothing persisted");
581            return;
582        }
583    };
584    let raw = out.text.unwrap_or_default();
585
586    let parsed = extract_facts(&raw);
587    if let Some(facts) = parsed.as_ref() {
588        for f in facts.iter().take(max_facts) {
589            let content = f.content.trim().to_string();
590            if content.is_empty() {
591                continue;
592            }
593            let mut tags = base_tags.clone();
594            tags.extend(f.tags.clone());
595            let mut entry = MemoryEntry::new(content)
596                .with_source(source.clone())
597                .with_tags(tags);
598            if let Some(days) = f.ttl_days
599                && days > 0
600            {
601                entry = entry.with_ttl_days(days);
602            }
603            if let Err(e) = memory.write(entry).await {
604                tracing::warn!(error = %e, "memory synth write failed");
605            }
606        }
607    } else if !raw.trim().is_empty() {
608        // Parse genuinely failed (not "model returned []"). Persist the raw
609        // payload tagged "synth-raw" so the operator can grep it later.
610        let mut tags = base_tags;
611        tags.push("synth-raw".into());
612        let entry = MemoryEntry::new(raw.trim().to_string())
613            .with_source(source)
614            .with_tags(tags);
615        if let Err(e) = memory.write(entry).await {
616            tracing::warn!(error = %e, "memory synth-raw write failed");
617        }
618    }
619}
620
621#[cfg(test)]
622mod tests {
623    use super::*;
624    use harness_core::{ModelOutput, StopReason, Usage};
625    use std::sync::atomic::{AtomicU64, Ordering};
626
627    /// Test-only in-memory backend so we don't touch the filesystem.
628    #[derive(Default)]
629    struct VecMemory {
630        store: Mutex<Vec<MemoryEntry>>,
631    }
632    #[async_trait]
633    impl Memory for VecMemory {
634        async fn recall(
635            &self,
636            query: &str,
637            k: usize,
638        ) -> Result<Vec<MemoryEntry>, harness_core::MemoryError> {
639            let g = self.store.lock().unwrap();
640            let q = query.to_lowercase();
641            let mut hits: Vec<MemoryEntry> = g
642                .iter()
643                .filter(|e| {
644                    let hay = e.content.to_lowercase();
645                    q.split_whitespace().any(|t| hay.contains(t))
646                })
647                .cloned()
648                .collect();
649            hits.truncate(k);
650            Ok(hits)
651        }
652        async fn write(&self, entry: MemoryEntry) -> Result<(), harness_core::MemoryError> {
653            self.store.lock().unwrap().push(entry);
654            Ok(())
655        }
656    }
657
658    static SEQ: AtomicU64 = AtomicU64::new(0);
659
660    #[tokio::test]
661    async fn writer_persists_last_text_on_task_completed() {
662        let mem = Arc::new(VecMemory::default());
663        let w = MemoryWriter::new(mem.clone()).with_source("test-app");
664        let mut world = harness_context::default_world(std::env::temp_dir().join(format!(
665            "harness-mw-{}-{}",
666            std::process::id(),
667            SEQ.fetch_add(1, Ordering::SeqCst)
668        )));
669
670        let out = ModelOutput {
671            text: Some("final answer X".into()),
672            tool_calls: vec![],
673            usage: Usage::default(),
674            stop_reason: StopReason::EndTurn,
675            reasoning: None,
676        };
677        let _ = w.fire(&Event::PostModel { out: &out }, &mut world);
678        let _ = w.fire(&Event::TaskCompleted, &mut world);
679
680        // The hook spawns; give the runtime a tick to drain.
681        tokio::task::yield_now().await;
682        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
683
684        let stored = mem.store.lock().unwrap().clone();
685        assert_eq!(stored.len(), 1);
686        assert_eq!(stored[0].content, "final answer X");
687        assert_eq!(stored[0].source.as_deref(), Some("test-app"));
688    }
689
690    #[tokio::test]
691    async fn writer_skips_when_no_task_completed_fires() {
692        let mem = Arc::new(VecMemory::default());
693        let w = MemoryWriter::new(mem.clone());
694        let mut world = harness_context::default_world(std::env::temp_dir().join(format!(
695            "harness-mw-{}-{}",
696            std::process::id(),
697            SEQ.fetch_add(1, Ordering::SeqCst)
698        )));
699
700        let out = ModelOutput {
701            text: Some("partial".into()),
702            tool_calls: vec![],
703            usage: Usage::default(),
704            stop_reason: StopReason::ToolUse,
705            reasoning: None,
706        };
707        let _ = w.fire(&Event::PostModel { out: &out }, &mut world);
708        // No TaskCompleted ⇒ nothing should be written.
709        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
710        assert!(mem.store.lock().unwrap().is_empty());
711    }
712
713    #[tokio::test]
714    async fn synthesizer_parses_clean_json_and_writes_atomic_facts() {
715        use harness_models::{MockModel, MockResponse};
716
717        let mem = Arc::new(VecMemory::default());
718        let synth: Arc<dyn Model> = Arc::new(MockModel::new().script(MockResponse::text(
719            r#"[
720              {"content": "user prefers dark roast coffee, no sugar", "tags": ["coffee", "preferences"]},
721              {"content": "user lives in Beijing (Asia/Shanghai tz)", "tags": ["location", "timezone"]}
722            ]"#,
723        )));
724        let s = MemorySynthesizer::new(mem.clone(), synth).with_source("test");
725        let mut world = harness_context::default_world(std::env::temp_dir().join(format!(
726            "harness-ms-{}-{}",
727            std::process::id(),
728            SEQ.fetch_add(1, Ordering::SeqCst)
729        )));
730
731        let out_a = ModelOutput {
732            text: Some("I'll remember your coffee preference.".into()),
733            tool_calls: vec![],
734            usage: Usage::default(),
735            stop_reason: StopReason::ToolUse,
736            reasoning: None,
737        };
738        let out_b = ModelOutput {
739            text: Some("Setting Beijing as your timezone.".into()),
740            tool_calls: vec![],
741            usage: Usage::default(),
742            stop_reason: StopReason::EndTurn,
743            reasoning: None,
744        };
745        let _ = s.fire(&Event::PostModel { out: &out_a }, &mut world);
746        let _ = s.fire(&Event::PostModel { out: &out_b }, &mut world);
747        let _ = s.fire(&Event::TaskCompleted, &mut world);
748
749        for _ in 0..50 {
750            if mem.store.lock().unwrap().len() >= 2 {
751                break;
752            }
753            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
754        }
755        let stored = mem.store.lock().unwrap().clone();
756        assert_eq!(stored.len(), 2, "expected 2 atomic facts, got {stored:#?}");
757        assert!(stored.iter().any(|e| e.content.contains("dark roast")));
758        assert!(stored.iter().any(|e| e.content.contains("Beijing")));
759        let coffee = stored
760            .iter()
761            .find(|e| e.content.contains("dark roast"))
762            .unwrap();
763        assert!(coffee.tags.contains(&"coffee".to_string()));
764        assert_eq!(coffee.source.as_deref(), Some("test"));
765    }
766
767    #[tokio::test]
768    async fn synthesizer_strips_markdown_fences_around_json() {
769        use harness_models::{MockModel, MockResponse};
770
771        let mem = Arc::new(VecMemory::default());
772        let synth: Arc<dyn Model> = Arc::new(MockModel::new().script(MockResponse::text(
773            "Here are the facts:\n```json\n[{\"content\":\"fact one\",\"tags\":[\"x\"]}]\n```\n",
774        )));
775        let s = MemorySynthesizer::new(mem.clone(), synth);
776        let mut world = harness_context::default_world(std::env::temp_dir());
777
778        let out = ModelOutput {
779            text: Some("some chat".into()),
780            tool_calls: vec![],
781            usage: Usage::default(),
782            stop_reason: StopReason::EndTurn,
783            reasoning: None,
784        };
785        let _ = s.fire(&Event::PostModel { out: &out }, &mut world);
786        let _ = s.fire(&Event::TaskCompleted, &mut world);
787
788        for _ in 0..50 {
789            if !mem.store.lock().unwrap().is_empty() {
790                break;
791            }
792            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
793        }
794        let stored = mem.store.lock().unwrap().clone();
795        assert_eq!(stored.len(), 1);
796        assert_eq!(stored[0].content, "fact one");
797    }
798
799    #[tokio::test]
800    async fn synthesizer_empty_array_persists_nothing() {
801        // Regression: model correctly returns "[]" meaning "no durable
802        // facts to extract". This must NOT fall through to the synth-raw
803        // fallback (which would store the literal "[]" as a memory row).
804        use harness_models::{MockModel, MockResponse};
805
806        let mem = Arc::new(VecMemory::default());
807        let synth: Arc<dyn Model> = Arc::new(MockModel::new().script(MockResponse::text("[]")));
808        let s = MemorySynthesizer::new(mem.clone(), synth);
809        let mut world = harness_context::default_world(std::env::temp_dir());
810
811        let out = ModelOutput {
812            text: Some("fluff".into()),
813            tool_calls: vec![],
814            usage: Usage::default(),
815            stop_reason: StopReason::EndTurn,
816            reasoning: None,
817        };
818        let _ = s.fire(&Event::PostModel { out: &out }, &mut world);
819        let _ = s.fire(&Event::TaskCompleted, &mut world);
820
821        // Give the spawned synth task time to run.
822        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
823        let stored = mem.store.lock().unwrap().clone();
824        assert!(stored.is_empty(), "expected nothing stored, got {stored:?}");
825    }
826
827    #[tokio::test]
828    async fn synthesizer_falls_back_to_synth_raw_when_json_unparseable() {
829        use harness_models::{MockModel, MockResponse};
830
831        let mem = Arc::new(VecMemory::default());
832        let synth: Arc<dyn Model> = Arc::new(MockModel::new().script(MockResponse::text(
833            "The user said they like coffee. I think that's important.",
834        )));
835        let s = MemorySynthesizer::new(mem.clone(), synth);
836        let mut world = harness_context::default_world(std::env::temp_dir());
837
838        let out = ModelOutput {
839            text: Some("session chat".into()),
840            tool_calls: vec![],
841            usage: Usage::default(),
842            stop_reason: StopReason::EndTurn,
843            reasoning: None,
844        };
845        let _ = s.fire(&Event::PostModel { out: &out }, &mut world);
846        let _ = s.fire(&Event::TaskCompleted, &mut world);
847
848        for _ in 0..50 {
849            if !mem.store.lock().unwrap().is_empty() {
850                break;
851            }
852            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
853        }
854        let stored = mem.store.lock().unwrap().clone();
855        assert_eq!(stored.len(), 1);
856        assert!(stored[0].tags.contains(&"synth-raw".to_string()));
857        assert!(stored[0].content.contains("coffee"));
858    }
859}