1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
//! Episode completion and storage
use crate::error::{Error, Result};
use crate::security::audit::{AuditContext, episode_completed};
use crate::types::TaskOutcome;
use std::sync::Arc;
use tracing::{debug, info, instrument, warn};
use uuid::Uuid;
use super::SelfLearningMemory;
impl SelfLearningMemory {
/// Complete an episode and trigger learning analysis.
///
/// Finalizes the episode by recording the outcome, then performs the learning
/// cycle:
/// 1. **Marks complete** - Sets end time and outcome
/// 2. **Calculates reward** - Scores based on success, efficiency, complexity
/// 3. **Generates reflection** - Identifies successes, improvements, insights
/// 4. **Extracts patterns** - Finds reusable patterns from execution steps
/// 5. **Stores everything** - Persists to all storage backends
///
/// This is the core learning step. Patterns extracted here become available
/// for future task retrieval.
///
/// # Pattern Extraction Modes
///
/// - **Synchronous** (default): Patterns extracted before returning
/// - **Asynchronous**: If [`enable_async_extraction()`](SelfLearningMemory::enable_async_extraction)
/// was called, patterns are extracted in background workers
///
/// # Arguments
///
/// * `episode_id` - ID returned from [`start_episode()`](SelfLearningMemory::start_episode)
/// * `outcome` - Final outcome describing success/failure and artifacts
///
/// # Returns
///
/// `Ok(())` on success, or an error if the episode doesn't exist.
///
/// # Errors
///
/// Returns [`Error::NotFound`] if the episode ID doesn't exist.
///
/// # Examples
///
/// ```
/// use do_memory_core::{SelfLearningMemory, TaskContext, TaskType, TaskOutcome};
/// use do_memory_core::{ExecutionStep, ExecutionResult};
///
/// # async fn example() -> anyhow::Result<()> {
/// let memory = SelfLearningMemory::new();
///
/// // Start episode
/// let episode_id = memory.start_episode(
/// "Fix authentication bug".to_string(),
/// TaskContext::default(),
/// TaskType::Debugging,
/// ).await;
///
/// // Log debugging steps
/// let mut step = ExecutionStep::new(1, "debugger".to_string(), "Identify issue".to_string());
/// step.result = Some(ExecutionResult::Success {
/// output: "Found null pointer in auth handler".to_string(),
/// });
/// memory.log_step(episode_id, step).await;
///
/// // Complete with success
/// memory.complete_episode(
/// episode_id,
/// TaskOutcome::Success {
/// verdict: "Bug fixed, tests added".to_string(),
/// artifacts: vec!["auth_fix.patch".to_string(), "auth_test.rs".to_string()],
/// },
/// ).await?;
///
/// // Episode now has reward, reflection, and patterns
/// let episode = memory.get_episode(episode_id).await?;
/// assert!(episode.reward.is_some());
/// assert!(episode.reflection.is_some());
/// # Ok(())
/// # }
/// ```
#[instrument(skip(self, outcome), fields(episode_id = %episode_id))]
pub async fn complete_episode(&self, episode_id: Uuid, outcome: TaskOutcome) -> Result<()> {
// Flush any buffered steps before completing the episode
// This ensures all steps are persisted and available for analysis
if self.batch_config().is_some() {
debug!(
episode_id = %episode_id,
"Flushing buffered steps before episode completion"
);
self.flush_steps(episode_id).await?;
}
// Get the episode Arc and clone it to work with Episode directly
let episode_arc = {
let episodes = self.episodes_fallback.read().await;
episodes
.get(&episode_id)
.cloned()
.ok_or(Error::NotFound(episode_id))?
};
let mut episode = (*episode_arc).clone();
// Mark episode as complete
episode.complete(outcome.clone());
// Validate total episode size before processing
super::validation::validate_episode_size(&episode)?;
// ============================================================================
// Pre-Storage Reasoning (PREMem Phase 1)
// ============================================================================
// 1. Assess episode quality before storage
let quality_score = self.quality_assessor.assess_episode(&episode);
info!(
episode_id = %episode_id,
quality_score = quality_score,
quality_threshold = self.quality_threshold(),
"Assessed episode quality"
);
// 2. Check if episode meets quality threshold
if quality_score < self.quality_threshold() {
warn!(
episode_id = %episode_id,
quality_score = quality_score,
quality_threshold = self.quality_threshold(),
"Episode rejected: quality score below threshold"
);
// Return error - episode will not be stored
return Err(Error::ValidationFailed(format!(
"Episode quality score ({:.2}) below threshold ({:.2})",
quality_score,
self.quality_threshold()
)));
}
// 3. Extract salient features for high-quality episodes
let salient_features = self.salient_extractor.extract(&episode);
episode.salient_features = Some(salient_features.clone());
debug!(
episode_id = %episode_id,
feature_count = salient_features.count(),
critical_decisions = salient_features.critical_decisions.len(),
tool_combinations = salient_features.tool_combinations.len(),
error_recovery_patterns = salient_features.error_recovery_patterns.len(),
key_insights = salient_features.key_insights.len(),
"Extracted salient features"
);
// ============================================================================
// Learning Analysis (Existing Workflow)
// ============================================================================
// Calculate reward score
let reward = self.reward_calculator.calculate(&episode);
episode.reward = Some(reward.clone());
info!(
episode_id = %episode_id,
reward_total = reward.total,
reward_base = reward.base,
reward_efficiency = reward.efficiency,
"Calculated reward score"
);
// Generate reflection
let reflection = self.reflection_generator.generate(&episode);
episode.reflection = Some(reflection.clone());
debug!(
successes = reflection.successes.len(),
improvements = reflection.improvements.len(),
insights = reflection.insights.len(),
"Generated reflection"
);
// ============================================================================
// Phase 2 (GENESIS) - Semantic Summarization
// ============================================================================
// Generate semantic summary before storage (if enabled)
let summary = if let Some(ref summarizer) = self.semantic_summarizer {
match summarizer.summarize_episode(&episode).await {
Ok(summary) => {
info!(
episode_id = %episode_id,
summary_words = summary.summary_text.split_whitespace().count(),
key_concepts = summary.key_concepts.len(),
key_steps = summary.key_steps.len(),
"Generated semantic summary"
);
Some(summary)
}
Err(e) => {
warn!("Failed to generate semantic summary: {}", e);
None
}
}
} else {
None
};
// ============================================================================
// Phase 2 (GENESIS) - Capacity-Constrained Storage
// ============================================================================
// Store with capacity enforcement if configured, otherwise use normal storage
if let Some(ref capacity_mgr) = self.capacity_manager {
// Get all episodes EXCEPT the current one for capacity calculation
// (the current episode is being added, so we check if we need to evict others)
let (current_count, all_episodes) = {
let eps = self.episodes_fallback.read().await;
let episodes: Vec<_> = eps
.iter()
.filter(|(id, _)| **id != episode_id) // Exclude current episode
.map(|(_, ep)| (**ep).clone()) // Dereference twice: &Arc<Episode> -> Episode
.collect();
(episodes.len(), episodes)
};
// Check if eviction is needed
if !capacity_mgr.can_store(current_count) {
let evicted_ids = capacity_mgr.evict_if_needed(&all_episodes);
if !evicted_ids.is_empty() {
info!(
episode_id = %episode_id,
evicted_count = evicted_ids.len(),
"Evicting episodes due to capacity constraints"
);
// Remove evicted episodes from in-memory storage
{
let mut episodes_map = self.episodes_fallback.write().await;
for evicted_id in &evicted_ids {
episodes_map.remove(evicted_id);
}
}
// Remove from storage backends
// Note: In Phase 2.4, storage backends will have store_episode_with_capacity()
// For now, we just log the eviction
debug!(
evicted_ids = ?evicted_ids,
"Episodes evicted (backend deletion to be implemented in Phase 2.4)"
);
// Phase 3: Remove evicted episodes from spatiotemporal index
if let Some(ref index) = self.spatiotemporal_index {
if let Ok(mut index_write) = index.try_write() {
let mut removed_count = 0;
for evicted_id in &evicted_ids {
index_write.remove(*evicted_id);
removed_count += 1;
}
debug!(
evicted_count = removed_count,
"Removed evicted episodes from spatiotemporal index"
);
}
}
}
}
}
// Use the episode for storage operations
let episode_ref = &episode;
// Store updated episode in backends
if let Some(cache) = &self.cache_storage {
if let Err(e) = cache.store_episode(episode_ref).await {
warn!("Failed to store completed episode in cache: {}", e);
}
}
if let Some(turso) = &self.turso_storage {
if let Err(e) = turso.store_episode(episode_ref).await {
warn!("Failed to store completed episode in Turso: {}", e);
}
}
// Store episode summary if generated
// ADR-044 Feature 1: Persist semantic summary for playbook generation
if let Some(ref summary) = summary {
// Store in summaries cache for retrieval during playbook generation
{
let mut summaries = self.summaries_fallback.write().await;
summaries.insert(episode_id, summary.clone());
}
info!(
episode_id = %episode_id,
summary_words = summary.summary_text.split_whitespace().count(),
key_concepts = summary.key_concepts.len(),
"Stored semantic summary"
);
}
// ============================================================================
// Phase 3 (Spatiotemporal) - Update hierarchical index
// ============================================================================
// Update spatiotemporal index if enabled
if let Some(ref index) = self.spatiotemporal_index {
if let Ok(mut index_write) = index.try_write() {
index_write.insert(episode_ref);
debug!(
episode_id = %episode_id,
domain = %episode.context.domain,
task_type = %episode.task_type,
"Inserted episode into spatiotemporal index"
);
} else {
debug!(
episode_id = %episode_id,
"Spatiotemporal index locked, skipping indexing"
);
}
}
// ============================================================================
// Semantic Search - Generate and store embedding
// ============================================================================
// Generate and store embedding for semantic search
if let Some(ref semantic) = self.semantic_service {
if let Err(e) = semantic.embed_episode(episode_ref).await {
warn!(
episode_id = %episode_id,
error = %e,
"Failed to generate embedding for episode. Continuing without embedding."
);
// Don't fail entire operation on embedding error
} else {
debug!(
episode_id = %episode_id,
"Successfully generated embedding for episode"
);
}
}
// ============================================================================
// v0.1.12: Invalidate Query Cache
// ============================================================================
// Invalidate all cached queries since we added a new episode
// This ensures future retrievals will include the new episode
let metrics_before = self.query_cache.metrics();
self.query_cache.invalidate_all();
info!(
episode_id = %episode_id,
invalidated_entries = metrics_before.size,
total_invalidations = metrics_before.invalidations + metrics_before.size as u64,
"Invalidated query cache after episode completion"
);
// ============================================================================
// Re-insert the updated episode into the in-memory cache
// ============================================================================
{
let mut episodes = self.episodes_fallback.write().await;
episodes.insert(episode_id, Arc::new(episode));
}
// Extract patterns - async if queue enabled, sync otherwise
if let Some(queue) = &self.pattern_queue {
// Async path: enqueue for background processing
queue.enqueue_episode(episode_id).await?;
info!(
episode_id = %episode_id,
"Episode completed, enqueued for async pattern extraction"
);
} else {
// Sync path: extract patterns immediately
self.extract_patterns_sync(episode_id).await?;
info!(
episode_id = %episode_id,
"Episode completed and patterns extracted synchronously"
);
}
// Audit log: episode completed
let context = AuditContext::system();
let outcome_str = match &outcome {
TaskOutcome::Success { verdict, .. } => verdict.clone(),
TaskOutcome::PartialSuccess { verdict, .. } => verdict.clone(),
TaskOutcome::Failure { reason, .. } => reason.clone(),
};
let success = matches!(outcome, TaskOutcome::Success { .. });
let audit_entry = episode_completed(&context, episode_id, &outcome_str, success);
self.audit_logger.log(audit_entry);
Ok(())
}
}