Skip to main content

harness_loop/
memory_layer.rs

1//! Long-term-memory wiring for [`crate::AgentLoop`].
2//!
3//! Two pieces, designed to be installed together:
4//!
5//! - [`MemoryGuide`] — at session start, calls [`Memory::recall`] with the
6//!   current task description and pushes the top-K matches into
7//!   `ctx.guides` as plain text. The model sees a "Relevant prior context"
8//!   section in its system prompt before the very first model call.
9//!
10//! - [`MemoryWriter`] — captures every assistant text turn (via `PostModel`)
11//!   and persists the *last* one as a [`MemoryEntry`] when the run finishes
12//!   (`TaskCompleted`). This turns "this conversation produced an answer"
13//!   into "future sessions can recall the answer".
14//!
15//! Both share an `Arc<dyn Memory>` so a single backend serves recall +
16//! write. The trait is async; the writer hook uses `tokio::spawn` to commit
17//! without blocking the loop.
18//!
19//! ## Wiring
20//!
21//! ```ignore
22//! let mem: Arc<dyn Memory> = Arc::new(FileMemory::open("~/.harness/mem.jsonl")?);
23//! let loop_ = AgentLoop::new(model)
24//!     .with_guide(Arc::new(MemoryGuide::new(mem.clone()).with_top_k(5)))
25//!     .with_hook(Arc::new(MemoryWriter::new(mem)));
26//! ```
27
28use async_trait::async_trait;
29use harness_core::{
30    Block, Context, Event, Execution, Guide, GuideError, GuideId, GuideScope, Hook, HookOutcome,
31    Memory, MemoryEntry, Model, Task, Turn, TurnRole, World,
32};
33use std::sync::{Arc, Mutex, OnceLock};
34
35/// Marker prefix used to identify the recall block in `ctx.guides`. We
36/// strip prior recall blocks on each `apply_before_iter` so the injected
37/// list reflects only the LATEST recall — otherwise ctx.guides grows
38/// unboundedly across iterations.
39const MEMORY_RECALL_MARKER: &str = "[memory-recall]\n";
40
41/// Guide that recalls relevant prior memories and injects them into
42/// `ctx.guides` as a `Block::Text` for the model to see.
43///
44/// Two recall points:
45///
46/// - `apply` (session start): one-shot recall using `ctx.task.description`
47///   as the query. Always fires.
48/// - `apply_before_iter` (every model turn): re-recalls using the **last
49///   user message** as query, replacing the previous recall block. Lets
50///   the recall track topic drift mid-session. No-op when there's no user
51///   message in history (the very first iteration uses the `apply` recall).
52///
53/// Filters (chainable builders, post-recall):
54///
55/// - `with_top_k(k)` — number of candidates to fetch from `Memory::recall`.
56///   Default 5.
57/// - `with_min_score(s)` — drop entries whose recomputed normalised
58///   keyword overlap with the query is below `s`. Default 0.0 (no filter).
59///   Score = `(query_tokens ∩ entry_tokens).len() / query_tokens.len()`.
60/// - `with_required_tags(tags)` — drop entries that don't have ALL these
61///   tags. Default empty (no filter).
62/// - `with_excluded_tags(tags)` — drop entries that have ANY of these
63///   tags. Default empty.
64///
65/// When filters are tight, we over-fetch `top_k * 3` candidates so there's
66/// room to drop without starving the output.
67pub struct MemoryGuide {
68    memory: Arc<dyn Memory>,
69    top_k: usize,
70    min_score: f32,
71    required_tags: Vec<String>,
72    excluded_tags: Vec<String>,
73}
74
75static MEMORY_GUIDE_ID: OnceLock<GuideId> = OnceLock::new();
76static MEMORY_GUIDE_SCOPE: OnceLock<GuideScope> = OnceLock::new();
77
78impl MemoryGuide {
79    /// Construct a guide that recalls up to 5 entries per session.
80    pub fn new(memory: Arc<dyn Memory>) -> Self {
81        Self {
82            memory,
83            top_k: 5,
84            min_score: 0.0,
85            required_tags: Vec::new(),
86            excluded_tags: Vec::new(),
87        }
88    }
89
90    /// Override the number of memories recalled per session. Pick small —
91    /// every recalled line spends prompt tokens.
92    pub fn with_top_k(mut self, k: usize) -> Self {
93        self.top_k = k;
94        self
95    }
96
97    /// Drop entries whose recomputed normalised keyword overlap with the
98    /// query is below `s` ∈ [0, 1]. Default 0.0 (= keep all top_k).
99    ///
100    /// Score formula:
101    /// `(distinct query tokens present in entry.content+tags) / (query token count)`
102    ///
103    /// So a query of 4 tokens needs ≥3 to land in the entry to score ≥ 0.75.
104    pub fn with_min_score(mut self, s: f32) -> Self {
105        self.min_score = s.clamp(0.0, 1.0);
106        self
107    }
108
109    /// Only inject entries that have ALL of these tags. Empty = no filter.
110    pub fn with_required_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
111        self.required_tags = tags.into_iter().map(Into::into).collect();
112        self
113    }
114
115    /// Drop entries that have ANY of these tags. Empty = no filter.
116    pub fn with_excluded_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
117        self.excluded_tags = tags.into_iter().map(Into::into).collect();
118        self
119    }
120
121    /// Inner recall + filter + format pass. Returns the formatted block
122    /// text, or `None` if there's nothing to inject.
123    async fn recall_block(&self, query: &str) -> Option<String> {
124        if self.top_k == 0 || query.trim().is_empty() {
125            return None;
126        }
127        // Over-fetch when filters are active so the post-filter has room.
128        let fetch_k = if self.min_score > 0.0
129            || !self.required_tags.is_empty()
130            || !self.excluded_tags.is_empty()
131        {
132            self.top_k.saturating_mul(3).max(self.top_k)
133        } else {
134            self.top_k
135        };
136        let hits = match self.memory.recall(query, fetch_k).await {
137            Ok(v) => v,
138            Err(e) => {
139                tracing::warn!(error = %e, "memory recall failed; proceeding without it");
140                return None;
141            }
142        };
143        let q_tokens = tokenise_for_score(query);
144        let q_len = q_tokens.len().max(1) as f32;
145
146        let mut kept: Vec<&MemoryEntry> = Vec::new();
147        for e in &hits {
148            // Tag filters first — cheaper than re-scoring.
149            if !self.required_tags.is_empty()
150                && !self
151                    .required_tags
152                    .iter()
153                    .all(|t| e.tags.iter().any(|x| x == t))
154            {
155                continue;
156            }
157            if !self.excluded_tags.is_empty()
158                && self
159                    .excluded_tags
160                    .iter()
161                    .any(|t| e.tags.iter().any(|x| x == t))
162            {
163                continue;
164            }
165            if self.min_score > 0.0 {
166                let score = recompute_score(&q_tokens, e);
167                if (score / q_len) < self.min_score {
168                    continue;
169                }
170            }
171            kept.push(e);
172            if kept.len() >= self.top_k {
173                break;
174            }
175        }
176        if kept.is_empty() {
177            return None;
178        }
179        let mut lines = String::from(MEMORY_RECALL_MARKER);
180        lines.push_str("Relevant prior context (from your long-term memory):");
181        for (i, e) in kept.iter().enumerate() {
182            lines.push_str(&format!("\n  {}. {}", i + 1, e.content.trim()));
183        }
184        Some(lines)
185    }
186
187    fn remove_previous_recall_block(ctx: &mut Context) {
188        ctx.guides
189            .retain(|b| !matches!(b, Block::Text(t) if t.starts_with(MEMORY_RECALL_MARKER)));
190    }
191}
192
193/// Pull out the most recent user-role text from `ctx.history`. Used by
194/// `apply_before_iter` to drive the per-turn recall query.
195fn last_user_text(ctx: &Context) -> Option<String> {
196    use harness_core::{Block as B, TurnRole};
197    for turn in ctx.history.iter().rev() {
198        if turn.role != TurnRole::User {
199            continue;
200        }
201        for block in turn.blocks.iter().rev() {
202            if let B::Text(t) = block
203                && !t.trim().is_empty()
204            {
205                return Some(t.clone());
206            }
207        }
208    }
209    None
210}
211
212fn tokenise_for_score(s: &str) -> std::collections::HashSet<String> {
213    s.to_lowercase()
214        .split(|c: char| !c.is_alphanumeric())
215        .filter(|t| t.len() >= 3)
216        .map(String::from)
217        .collect()
218}
219
220fn recompute_score(query_tokens: &std::collections::HashSet<String>, entry: &MemoryEntry) -> f32 {
221    let mut hay = entry.content.to_lowercase();
222    if !entry.tags.is_empty() {
223        hay.push(' ');
224        hay.push_str(&entry.tags.join(" ").to_lowercase());
225    }
226    query_tokens
227        .iter()
228        .filter(|t| hay.contains(t.as_str()))
229        .count() as f32
230}
231
232#[async_trait]
233impl Guide for MemoryGuide {
234    fn id(&self) -> &GuideId {
235        MEMORY_GUIDE_ID.get_or_init(|| "memory-recall".into())
236    }
237    fn kind(&self) -> Execution {
238        // The recall *itself* is computational (keyword match / vector
239        // lookup); the model later infers over the result.
240        Execution::Computational
241    }
242    fn scope(&self) -> &GuideScope {
243        MEMORY_GUIDE_SCOPE.get_or_init(|| GuideScope::Always)
244    }
245    async fn apply(&self, ctx: &mut Context, _w: &World) -> Result<(), GuideError> {
246        Self::remove_previous_recall_block(ctx);
247        if let Some(block) = self.recall_block(&ctx.task.description).await {
248            ctx.guides.push(Block::Text(block));
249        }
250        Ok(())
251    }
252    async fn apply_before_iter(&self, ctx: &mut Context, _w: &World) -> Result<(), GuideError> {
253        // Query = latest user message; fall back to task.description on
254        // turn 0 (before any user turn lands in history — though the loop
255        // pushes the task as a user turn before iter 0 so this is rare).
256        let query = last_user_text(ctx).unwrap_or_else(|| ctx.task.description.clone());
257        Self::remove_previous_recall_block(ctx);
258        if let Some(block) = self.recall_block(&query).await {
259            ctx.guides.push(Block::Text(block));
260        }
261        Ok(())
262    }
263}
264
265/// Hook that writes the final assistant text of every successful run back
266/// into long-term memory.
267///
268/// Behaviour:
269/// - On every `PostModel`, captures `out.text` into an internal slot.
270/// - On `TaskCompleted`, takes the most recent captured text and writes it
271///   as a `MemoryEntry` tagged with the source (defaults to `"session"`).
272/// - On `SessionEnd` without a `TaskCompleted` (i.e. `BudgetExhausted`),
273///   nothing is written — partial work shouldn't pollute long-term memory.
274pub struct MemoryWriter {
275    memory: Arc<dyn Memory>,
276    last_text: Mutex<Option<String>>,
277    source: String,
278    tags: Vec<String>,
279}
280
281impl MemoryWriter {
282    pub fn new(memory: Arc<dyn Memory>) -> Self {
283        Self {
284            memory,
285            last_text: Mutex::new(None),
286            source: "session".into(),
287            tags: Vec::new(),
288        }
289    }
290
291    /// Tag every persisted memory with the given source name (e.g.
292    /// `"investor-bot"`, `"personal-assistant"`). Useful for multi-app
293    /// memory stores.
294    pub fn with_source(mut self, source: impl Into<String>) -> Self {
295        self.source = source.into();
296        self
297    }
298
299    pub fn with_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
300        self.tags = tags.into_iter().map(Into::into).collect();
301        self
302    }
303}
304
305impl Hook for MemoryWriter {
306    fn name(&self) -> &str {
307        "memory-writer"
308    }
309    fn matches(&self, ev: &Event<'_>) -> bool {
310        matches!(ev, Event::PostModel { .. } | Event::TaskCompleted)
311    }
312    fn fire(&self, ev: &Event<'_>, _w: &mut World) -> HookOutcome {
313        match ev {
314            Event::PostModel { out } => {
315                if let Some(text) = &out.text
316                    && !text.trim().is_empty()
317                    && let Ok(mut slot) = self.last_text.lock()
318                {
319                    *slot = Some(text.clone());
320                }
321            }
322            Event::TaskCompleted => {
323                let Some(text) = self.last_text.lock().ok().and_then(|mut g| g.take()) else {
324                    return HookOutcome::Allow;
325                };
326                let entry = MemoryEntry::new(text)
327                    .with_source(self.source.clone())
328                    .with_tags(self.tags.clone());
329                let mem = self.memory.clone();
330                // Fire-and-forget: we're inside an async loop, so spawning
331                // is safe and avoids blocking the next iteration.
332                tokio::spawn(async move {
333                    if let Err(e) = mem.write(entry).await {
334                        tracing::warn!(error = %e, "memory write failed");
335                    }
336                });
337            }
338            _ => {}
339        }
340        HookOutcome::Allow
341    }
342}
343
344/// Smarter alternative to [`MemoryWriter`] — distil the session's assistant
345/// turns into 1..=`max_facts` atomic durable facts using a cheap
346/// "synthesizer" model, instead of persisting the verbatim final answer.
347///
348/// Wire either `MemoryWriter` **or** `MemorySynthesizer`, not both —
349/// `MemorySynthesizer` is a superset of the writer's behaviour with the
350/// extra distillation step.
351///
352/// Behaviour:
353/// - On `PostModel`, appends `out.text` (when present, non-empty) to an
354///   internal buffer.
355/// - On `TaskCompleted`, `tokio::spawn`s a synthesis task: calls
356///   `synth_model.complete()` with a fixed prompt that asks for a JSON
357///   array of `{content, tags}` objects, parses the response, and writes
358///   each one via `Memory::write`.
359/// - Model errors / parse failures fall back to saving the raw response
360///   as a single entry tagged `"synth-raw"` so the session's information
361///   isn't lost entirely.
362/// - On `BudgetExhausted` (no `TaskCompleted` fires), nothing is written.
363///
364/// The synth model should be cheap (`deepseek-v4-flash`, `gpt-5-nano`, etc.).
365/// Constructed independently from the main model so you can use a small
366/// summariser even when the reasoning model is large.
367pub struct MemorySynthesizer {
368    memory: Arc<dyn Memory>,
369    synth_model: Arc<dyn Model>,
370    transcripts: Mutex<Vec<String>>,
371    source: String,
372    base_tags: Vec<String>,
373    max_facts: usize,
374    /// App-specific instructions prepended to the synth prompt. Used to
375    /// give the model domain context (e.g. "this is a personal accounting
376    /// app — transactions are already stored, don't repeat flows as facts").
377    extra_instructions: Option<String>,
378    // JoinHandles of spawned synthesis tasks. The agent loop's owner can
379    // `await flush_pending()` before exiting to guarantee that synth
380    // completes before the process tears down its tokio runtime.
381    pending: Mutex<Vec<tokio::task::JoinHandle<()>>>,
382}
383
384impl MemorySynthesizer {
385    /// Construct a synthesizer that uses `synth_model` to distil the
386    /// session into at most 3 facts.
387    pub fn new(memory: Arc<dyn Memory>, synth_model: Arc<dyn Model>) -> Self {
388        Self {
389            memory,
390            synth_model,
391            transcripts: Mutex::new(Vec::new()),
392            source: "session".into(),
393            base_tags: Vec::new(),
394            max_facts: 3,
395            extra_instructions: None,
396            pending: Mutex::new(Vec::new()),
397        }
398    }
399
400    /// Prepend domain-specific guidance to the synthesizer's prompt. The
401    /// extra text shows up BEFORE the standard "extract durable facts"
402    /// instructions, so it sets context for what the model should consider
403    /// durable in this application.
404    ///
405    /// Example for a personal-accounting app:
406    /// ```ignore
407    /// .with_extra_instructions(
408    ///   "This is a personal-accounting agent. Transaction flows like \
409    ///    '¥199 火锅 microwave' are stored in the txns table — do NOT \
410    ///    re-store them as facts. ONLY record: stable user preferences \
411    ///    (payment habits, category conventions), repeated behaviour \
412    ///    patterns (≥2 mentions), or long-term decisions (subscription \
413    ///    cadences, investment policies). If unsure, prefer empty []."
414    /// )
415    /// ```
416    pub fn with_extra_instructions(mut self, instructions: impl Into<String>) -> Self {
417        self.extra_instructions = Some(instructions.into());
418        self
419    }
420
421    /// Await all background synthesis tasks that have been kicked off so
422    /// far. Call this before your process exits if you want to guarantee
423    /// the last session's memory is on disk — otherwise the tokio runtime
424    /// may be dropped while the spawn is mid-flight.
425    pub async fn flush_pending(&self) {
426        let handles: Vec<tokio::task::JoinHandle<()>> = match self.pending.lock() {
427            Ok(mut g) => std::mem::take(&mut *g),
428            Err(_) => return,
429        };
430        for h in handles {
431            let _ = h.await;
432        }
433    }
434
435    pub fn with_source(mut self, source: impl Into<String>) -> Self {
436        self.source = source.into();
437        self
438    }
439
440    pub fn with_base_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
441        self.base_tags = tags.into_iter().map(Into::into).collect();
442        self
443    }
444
445    /// Cap how many facts the synthesizer is allowed to emit. Default 3.
446    pub fn with_max_facts(mut self, n: usize) -> Self {
447        self.max_facts = n.max(1);
448        self
449    }
450}
451
452#[derive(serde::Deserialize)]
453struct SynthFact {
454    #[serde(default)]
455    content: String,
456    #[serde(default)]
457    tags: Vec<String>,
458    /// Optional retention hint emitted by the synth model. `None` = keep
459    /// indefinitely (stable preferences, identity). Finite N = expire after
460    /// N days (one-off project state, session-scoped preferences).
461    #[serde(default)]
462    ttl_days: Option<u32>,
463}
464
465/// Best-effort JSON-array extractor: tolerates markdown code fences and
466/// leading/trailing prose around the JSON body.
467fn extract_facts(raw: &str) -> Option<Vec<SynthFact>> {
468    // Strip ```json ... ``` or ``` ... ``` fences if present.
469    let stripped = raw.trim();
470    let body = if let Some(rest) = stripped.strip_prefix("```json") {
471        rest.trim_start_matches('\n')
472            .rsplit_once("```")
473            .map(|(b, _)| b)
474            .unwrap_or(rest)
475    } else if let Some(rest) = stripped.strip_prefix("```") {
476        rest.trim_start_matches('\n')
477            .rsplit_once("```")
478            .map(|(b, _)| b)
479            .unwrap_or(rest)
480    } else {
481        stripped
482    };
483    // Find first '[' and last ']' — JSON array.
484    let start = body.find('[')?;
485    let end = body.rfind(']')?;
486    if end <= start {
487        return None;
488    }
489    serde_json::from_str::<Vec<SynthFact>>(&body[start..=end]).ok()
490}
491
492impl Hook for MemorySynthesizer {
493    fn name(&self) -> &str {
494        "memory-synthesizer"
495    }
496    fn matches(&self, ev: &Event<'_>) -> bool {
497        matches!(ev, Event::PostModel { .. } | Event::TaskCompleted)
498    }
499    fn fire(&self, ev: &Event<'_>, _w: &mut World) -> HookOutcome {
500        match ev {
501            Event::PostModel { out } => {
502                if let Some(text) = &out.text
503                    && !text.trim().is_empty()
504                    && let Ok(mut buf) = self.transcripts.lock()
505                {
506                    buf.push(text.clone());
507                }
508            }
509            Event::TaskCompleted => {
510                let transcript = match self.transcripts.lock() {
511                    Ok(mut g) => std::mem::take(&mut *g).join("\n\n---\n\n"),
512                    Err(_) => return HookOutcome::Allow,
513                };
514                if transcript.trim().is_empty() {
515                    return HookOutcome::Allow;
516                }
517                let mem = self.memory.clone();
518                let model = self.synth_model.clone();
519                let source = self.source.clone();
520                let base_tags = self.base_tags.clone();
521                let max_facts = self.max_facts;
522                let extra = self.extra_instructions.clone();
523                let handle = tokio::spawn(async move {
524                    distil_and_write(mem, model, source, base_tags, max_facts, extra, transcript)
525                        .await;
526                });
527                if let Ok(mut g) = self.pending.lock() {
528                    g.push(handle);
529                }
530            }
531            _ => {}
532        }
533        HookOutcome::Allow
534    }
535}
536
537async fn distil_and_write(
538    memory: Arc<dyn Memory>,
539    model: Arc<dyn Model>,
540    source: String,
541    base_tags: Vec<String>,
542    max_facts: usize,
543    extra_instructions: Option<String>,
544    transcript: String,
545) {
546    let extra_block = match extra_instructions {
547        Some(s) if !s.trim().is_empty() => format!("\n\n[domain context]\n{s}\n"),
548        _ => String::new(),
549    };
550    let prompt = format!(
551        "Below is the assistant's turns from a completed agent session. \
552         Extract 1 to {max_facts} DURABLE FACTS worth remembering for future sessions \
553         (user preferences, decisions made, key findings, learned constraints — NOT \
554         transient details like timestamps or one-off answers).{extra_block} \
555         \n\nReturn ONLY a JSON array (no prose, no markdown fences) where each item is \
556         {{\"content\": \"<one durable fact, 1-2 sentences>\", \"tags\": [\"<keyword>\", ...], \
557         \"ttl_days\": <integer or null>}}. \
558         `ttl_days` controls how long the fact stays in memory: \
559         `null` = permanent (use for stable preferences, identity, long-term decisions); \
560         `7` = one week (current task / sprint scope); \
561         `30`-`180` = project-scope context; \
562         `1` = ephemeral (rarely useful — prefer omitting facts that are this fleeting). \
563         Use 2-5 lowercase keyword tags per fact for retrieval. \
564         If the session produced nothing durable, return [].\
565         \n\n--- SESSION TRANSCRIPT ---\n{transcript}\n--- END TRANSCRIPT ---"
566    );
567
568    let mut ctx = Context::new(Task {
569        description: prompt.clone(),
570        source: None,
571        deadline: None,
572    });
573    ctx.history.push(Turn {
574        role: TurnRole::User,
575        blocks: vec![Block::Text(prompt)],
576    });
577
578    let out = match model.complete(&ctx).await {
579        Ok(o) => o,
580        Err(e) => {
581            tracing::warn!(error = %e, "memory synth model call failed; nothing persisted");
582            return;
583        }
584    };
585    let raw = out.text.unwrap_or_default();
586
587    let parsed = extract_facts(&raw);
588    if let Some(facts) = parsed.as_ref() {
589        for f in facts.iter().take(max_facts) {
590            let content = f.content.trim().to_string();
591            if content.is_empty() {
592                continue;
593            }
594            let mut tags = base_tags.clone();
595            tags.extend(f.tags.clone());
596            let mut entry = MemoryEntry::new(content)
597                .with_source(source.clone())
598                .with_tags(tags);
599            if let Some(days) = f.ttl_days
600                && days > 0
601            {
602                entry = entry.with_ttl_days(days);
603            }
604            if let Err(e) = memory.write(entry).await {
605                tracing::warn!(error = %e, "memory synth write failed");
606            }
607        }
608    } else if !raw.trim().is_empty() {
609        // Parse genuinely failed (not "model returned []"). Persist the raw
610        // payload tagged "synth-raw" so the operator can grep it later.
611        let mut tags = base_tags;
612        tags.push("synth-raw".into());
613        let entry = MemoryEntry::new(raw.trim().to_string())
614            .with_source(source)
615            .with_tags(tags);
616        if let Err(e) = memory.write(entry).await {
617            tracing::warn!(error = %e, "memory synth-raw write failed");
618        }
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625    use harness_core::{ModelOutput, StopReason, Usage};
626    use std::sync::atomic::{AtomicU64, Ordering};
627
628    /// Test-only in-memory backend so we don't touch the filesystem.
629    #[derive(Default)]
630    struct VecMemory {
631        store: Mutex<Vec<MemoryEntry>>,
632    }
633    #[async_trait]
634    impl Memory for VecMemory {
635        async fn recall(
636            &self,
637            query: &str,
638            k: usize,
639        ) -> Result<Vec<MemoryEntry>, harness_core::MemoryError> {
640            let g = self.store.lock().unwrap();
641            let q = query.to_lowercase();
642            let mut hits: Vec<MemoryEntry> = g
643                .iter()
644                .filter(|e| {
645                    let hay = e.content.to_lowercase();
646                    q.split_whitespace().any(|t| hay.contains(t))
647                })
648                .cloned()
649                .collect();
650            hits.truncate(k);
651            Ok(hits)
652        }
653        async fn write(&self, entry: MemoryEntry) -> Result<(), harness_core::MemoryError> {
654            self.store.lock().unwrap().push(entry);
655            Ok(())
656        }
657    }
658
659    static SEQ: AtomicU64 = AtomicU64::new(0);
660
661    #[tokio::test]
662    async fn writer_persists_last_text_on_task_completed() {
663        let mem = Arc::new(VecMemory::default());
664        let w = MemoryWriter::new(mem.clone()).with_source("test-app");
665        let mut world = harness_context::default_world(std::env::temp_dir().join(format!(
666            "harness-mw-{}-{}",
667            std::process::id(),
668            SEQ.fetch_add(1, Ordering::SeqCst)
669        )));
670
671        let out = ModelOutput {
672            text: Some("final answer X".into()),
673            tool_calls: vec![],
674            usage: Usage::default(),
675            stop_reason: StopReason::EndTurn,
676            reasoning: None,
677        };
678        let _ = w.fire(&Event::PostModel { out: &out }, &mut world);
679        let _ = w.fire(&Event::TaskCompleted, &mut world);
680
681        // The hook spawns; give the runtime a tick to drain.
682        tokio::task::yield_now().await;
683        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
684
685        let stored = mem.store.lock().unwrap().clone();
686        assert_eq!(stored.len(), 1);
687        assert_eq!(stored[0].content, "final answer X");
688        assert_eq!(stored[0].source.as_deref(), Some("test-app"));
689    }
690
691    #[tokio::test]
692    async fn writer_skips_when_no_task_completed_fires() {
693        let mem = Arc::new(VecMemory::default());
694        let w = MemoryWriter::new(mem.clone());
695        let mut world = harness_context::default_world(std::env::temp_dir().join(format!(
696            "harness-mw-{}-{}",
697            std::process::id(),
698            SEQ.fetch_add(1, Ordering::SeqCst)
699        )));
700
701        let out = ModelOutput {
702            text: Some("partial".into()),
703            tool_calls: vec![],
704            usage: Usage::default(),
705            stop_reason: StopReason::ToolUse,
706            reasoning: None,
707        };
708        let _ = w.fire(&Event::PostModel { out: &out }, &mut world);
709        // No TaskCompleted ⇒ nothing should be written.
710        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
711        assert!(mem.store.lock().unwrap().is_empty());
712    }
713
714    #[tokio::test]
715    async fn synthesizer_parses_clean_json_and_writes_atomic_facts() {
716        use harness_models::{MockModel, MockResponse};
717
718        let mem = Arc::new(VecMemory::default());
719        let synth: Arc<dyn Model> = Arc::new(MockModel::new().script(MockResponse::text(
720            r#"[
721              {"content": "user prefers dark roast coffee, no sugar", "tags": ["coffee", "preferences"]},
722              {"content": "user lives in Beijing (Asia/Shanghai tz)", "tags": ["location", "timezone"]}
723            ]"#,
724        )));
725        let s = MemorySynthesizer::new(mem.clone(), synth).with_source("test");
726        let mut world = harness_context::default_world(std::env::temp_dir().join(format!(
727            "harness-ms-{}-{}",
728            std::process::id(),
729            SEQ.fetch_add(1, Ordering::SeqCst)
730        )));
731
732        let out_a = ModelOutput {
733            text: Some("I'll remember your coffee preference.".into()),
734            tool_calls: vec![],
735            usage: Usage::default(),
736            stop_reason: StopReason::ToolUse,
737            reasoning: None,
738        };
739        let out_b = ModelOutput {
740            text: Some("Setting Beijing as your timezone.".into()),
741            tool_calls: vec![],
742            usage: Usage::default(),
743            stop_reason: StopReason::EndTurn,
744            reasoning: None,
745        };
746        let _ = s.fire(&Event::PostModel { out: &out_a }, &mut world);
747        let _ = s.fire(&Event::PostModel { out: &out_b }, &mut world);
748        let _ = s.fire(&Event::TaskCompleted, &mut world);
749
750        for _ in 0..50 {
751            if mem.store.lock().unwrap().len() >= 2 {
752                break;
753            }
754            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
755        }
756        let stored = mem.store.lock().unwrap().clone();
757        assert_eq!(stored.len(), 2, "expected 2 atomic facts, got {stored:#?}");
758        assert!(stored.iter().any(|e| e.content.contains("dark roast")));
759        assert!(stored.iter().any(|e| e.content.contains("Beijing")));
760        let coffee = stored
761            .iter()
762            .find(|e| e.content.contains("dark roast"))
763            .unwrap();
764        assert!(coffee.tags.contains(&"coffee".to_string()));
765        assert_eq!(coffee.source.as_deref(), Some("test"));
766    }
767
768    #[tokio::test]
769    async fn synthesizer_strips_markdown_fences_around_json() {
770        use harness_models::{MockModel, MockResponse};
771
772        let mem = Arc::new(VecMemory::default());
773        let synth: Arc<dyn Model> = Arc::new(MockModel::new().script(MockResponse::text(
774            "Here are the facts:\n```json\n[{\"content\":\"fact one\",\"tags\":[\"x\"]}]\n```\n",
775        )));
776        let s = MemorySynthesizer::new(mem.clone(), synth);
777        let mut world = harness_context::default_world(std::env::temp_dir());
778
779        let out = ModelOutput {
780            text: Some("some chat".into()),
781            tool_calls: vec![],
782            usage: Usage::default(),
783            stop_reason: StopReason::EndTurn,
784            reasoning: None,
785        };
786        let _ = s.fire(&Event::PostModel { out: &out }, &mut world);
787        let _ = s.fire(&Event::TaskCompleted, &mut world);
788
789        for _ in 0..50 {
790            if !mem.store.lock().unwrap().is_empty() {
791                break;
792            }
793            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
794        }
795        let stored = mem.store.lock().unwrap().clone();
796        assert_eq!(stored.len(), 1);
797        assert_eq!(stored[0].content, "fact one");
798    }
799
800    #[tokio::test]
801    async fn synthesizer_empty_array_persists_nothing() {
802        // Regression: model correctly returns "[]" meaning "no durable
803        // facts to extract". This must NOT fall through to the synth-raw
804        // fallback (which would store the literal "[]" as a memory row).
805        use harness_models::{MockModel, MockResponse};
806
807        let mem = Arc::new(VecMemory::default());
808        let synth: Arc<dyn Model> = Arc::new(MockModel::new().script(MockResponse::text("[]")));
809        let s = MemorySynthesizer::new(mem.clone(), synth);
810        let mut world = harness_context::default_world(std::env::temp_dir());
811
812        let out = ModelOutput {
813            text: Some("fluff".into()),
814            tool_calls: vec![],
815            usage: Usage::default(),
816            stop_reason: StopReason::EndTurn,
817            reasoning: None,
818        };
819        let _ = s.fire(&Event::PostModel { out: &out }, &mut world);
820        let _ = s.fire(&Event::TaskCompleted, &mut world);
821
822        // Give the spawned synth task time to run.
823        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
824        let stored = mem.store.lock().unwrap().clone();
825        assert!(stored.is_empty(), "expected nothing stored, got {stored:?}");
826    }
827
828    #[tokio::test]
829    async fn synthesizer_falls_back_to_synth_raw_when_json_unparseable() {
830        use harness_models::{MockModel, MockResponse};
831
832        let mem = Arc::new(VecMemory::default());
833        let synth: Arc<dyn Model> = Arc::new(MockModel::new().script(MockResponse::text(
834            "The user said they like coffee. I think that's important.",
835        )));
836        let s = MemorySynthesizer::new(mem.clone(), synth);
837        let mut world = harness_context::default_world(std::env::temp_dir());
838
839        let out = ModelOutput {
840            text: Some("session chat".into()),
841            tool_calls: vec![],
842            usage: Usage::default(),
843            stop_reason: StopReason::EndTurn,
844            reasoning: None,
845        };
846        let _ = s.fire(&Event::PostModel { out: &out }, &mut world);
847        let _ = s.fire(&Event::TaskCompleted, &mut world);
848
849        for _ in 0..50 {
850            if !mem.store.lock().unwrap().is_empty() {
851                break;
852            }
853            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
854        }
855        let stored = mem.store.lock().unwrap().clone();
856        assert_eq!(stored.len(), 1);
857        assert!(stored[0].tags.contains(&"synth-raw".to_string()));
858        assert!(stored[0].content.contains("coffee"));
859    }
860}