Skip to main content

engram/context/
record.rs

1//! Operational Context write API.
2
3use chrono::{DateTime, Utc};
4use rusqlite::Connection;
5use serde::{Deserialize, Serialize};
6use serde_json::{json, Map, Value};
7use std::str::FromStr;
8
9use crate::context::artifact::{
10    ArtifactAccessPolicy, ArtifactRedactionStatus, ArtifactRetentionPolicy, NewContextArtifact,
11};
12use crate::context::metrics::{
13    estimate_json_tokens, estimate_tokens, estimated_savings_tokens, metrics_value,
14};
15use crate::context::policy::{
16    redact_field, redact_optional_field, redact_string_list, OperationalContextPolicy,
17    RedactedText, RedactionReport,
18};
19use crate::error::{EngramError, Result};
20use crate::storage::{
21    create_context_artifact, create_context_event, create_context_summary, get_context_event,
22    get_context_summary, ContextEvent, NewContextEvent, NewContextSummary,
23};
24
25#[derive(Debug, Clone, Deserialize)]
26pub struct ContextRecordRequest {
27    pub source: String,
28    #[serde(default)]
29    pub source_version: Option<String>,
30    #[serde(default)]
31    pub repo_id: Option<String>,
32    #[serde(default)]
33    pub workspace_path_hash: Option<String>,
34    #[serde(default)]
35    pub workspace: Option<String>,
36    #[serde(default)]
37    pub git_branch: Option<String>,
38    #[serde(default)]
39    pub worktree_name: Option<String>,
40    #[serde(default)]
41    pub commit_hash: Option<String>,
42    pub session_id: String,
43    #[serde(default)]
44    pub task_id: Option<String>,
45    #[serde(default)]
46    pub agent_id: Option<String>,
47    pub event_type: String,
48    #[serde(default)]
49    pub command: Option<String>,
50    #[serde(default)]
51    pub command_name: Option<String>,
52    #[serde(default)]
53    pub tool: Option<String>,
54    #[serde(default)]
55    pub tool_name: Option<String>,
56    #[serde(default)]
57    pub cwd: Option<String>,
58    #[serde(default)]
59    pub exit_code: Option<i64>,
60    #[serde(default)]
61    pub summary: Option<String>,
62    #[serde(default)]
63    pub key_errors: Vec<String>,
64    #[serde(default)]
65    pub touched_files: Vec<String>,
66    #[serde(default)]
67    pub reducer: Option<ContextReducerInput>,
68    #[serde(default)]
69    pub external_reducer: Option<String>,
70    #[serde(default)]
71    pub raw_pointer: Option<String>,
72    #[serde(default)]
73    pub external_unverified: Option<bool>,
74    #[serde(default)]
75    pub labels: Vec<String>,
76    #[serde(default)]
77    pub retention_policy: Option<String>,
78    #[serde(default)]
79    pub raw_artifact_id: Option<String>,
80    #[serde(default)]
81    pub metadata: Option<Value>,
82    #[serde(default)]
83    pub started_at: Option<String>,
84    #[serde(default)]
85    pub finished_at: Option<String>,
86}
87
88#[derive(Debug, Clone, Default, Serialize, Deserialize)]
89pub struct ContextReducerInput {
90    #[serde(default)]
91    pub name: Option<String>,
92    #[serde(default)]
93    pub version: Option<String>,
94    #[serde(default)]
95    pub external_reducer: Option<String>,
96    #[serde(default)]
97    pub raw_pointer: Option<String>,
98    #[serde(default)]
99    pub lossy: Option<bool>,
100    #[serde(default)]
101    pub confidence: Option<f64>,
102    #[serde(default)]
103    pub structured_facts: Option<Value>,
104    #[serde(default)]
105    pub warnings: Vec<String>,
106    #[serde(default)]
107    pub labels: Vec<String>,
108    #[serde(default)]
109    pub tokens_raw_est: Option<i64>,
110    #[serde(default)]
111    pub tokens_compact_est: Option<i64>,
112}
113
114#[derive(Debug, Clone, Serialize)]
115pub struct ContextRecordCreatedIds {
116    pub event_id: i64,
117    pub summary_id: Option<i64>,
118    pub raw_artifact_id: Option<String>,
119}
120
121#[derive(Debug, Clone, Serialize)]
122pub struct ProvenanceMetadata {
123    pub source: String,
124    pub source_version: Option<String>,
125    pub repo_id: Option<String>,
126    pub workspace_path_hash: Option<String>,
127    pub session_id: Option<String>,
128    pub task_id: Option<String>,
129    pub agent_id: Option<String>,
130    pub created_at: DateTime<Utc>,
131}
132
133#[derive(Debug, Clone, Serialize)]
134pub struct ContextRecordMetrics {
135    pub estimated: bool,
136    pub method: String,
137    pub observed_input_tokens_est: Option<i64>,
138    pub summary_tokens_est: Option<i64>,
139    pub stored_artifact_tokens_est: Option<i64>,
140    pub estimated_savings_tokens: Option<i64>,
141}
142
143#[derive(Debug, Clone, Serialize)]
144pub struct ContextRecordResponse {
145    pub created_ids: ContextRecordCreatedIds,
146    pub redaction_status: String,
147    pub retention_policy: String,
148    pub provenance: ProvenanceMetadata,
149    pub metrics: ContextRecordMetrics,
150}
151
152#[derive(Debug, Clone, Deserialize)]
153pub struct ContextRecordArtifactRequest {
154    #[serde(default)]
155    pub id: Option<String>,
156    #[serde(default)]
157    pub source_event_id: Option<i64>,
158    #[serde(default)]
159    pub source: Option<String>,
160    #[serde(default)]
161    pub source_version: Option<String>,
162    #[serde(default)]
163    pub repo_id: Option<String>,
164    #[serde(default)]
165    pub workspace_path_hash: Option<String>,
166    #[serde(default)]
167    pub workspace: Option<String>,
168    #[serde(default)]
169    pub session_id: Option<String>,
170    #[serde(default)]
171    pub task_id: Option<String>,
172    #[serde(default)]
173    pub agent_id: Option<String>,
174    pub kind: String,
175    #[serde(default)]
176    pub label: Option<String>,
177    #[serde(default)]
178    pub uri: Option<String>,
179    #[serde(default)]
180    pub raw_pointer: Option<String>,
181    #[serde(default)]
182    pub media_type: Option<String>,
183    #[serde(default)]
184    pub raw_content: Option<String>,
185    #[serde(default)]
186    pub content_sha256: Option<String>,
187    #[serde(default)]
188    pub byte_len: Option<i64>,
189    #[serde(default)]
190    pub retention_policy: Option<String>,
191    #[serde(default)]
192    pub access_policy: Option<String>,
193    #[serde(default)]
194    pub retain_raw: Option<bool>,
195    #[serde(default)]
196    pub ttl_seconds: Option<i64>,
197    #[serde(default)]
198    pub stale_after_seconds: Option<i64>,
199    #[serde(default)]
200    pub metadata: Option<Value>,
201}
202
203#[derive(Debug, Clone, Serialize)]
204pub struct ContextRecordArtifactResponse {
205    pub artifact_id: String,
206    pub storage_kind: String,
207    pub redaction_status: String,
208    pub retention_policy: String,
209    pub provenance: ProvenanceMetadata,
210    pub metrics: ContextRecordMetrics,
211}
212
213pub fn record_context(
214    conn: &Connection,
215    policy: &OperationalContextPolicy,
216    request: ContextRecordRequest,
217) -> Result<ContextRecordResponse> {
218    let mut report = RedactionReport::new();
219    let source_raw = require_non_empty(request.source, "source")?;
220    let source = redact_field_result(policy, &mut report, "source", &source_raw)?;
221    let source_version = redact_optional_result(
222        policy,
223        &mut report,
224        "source_version",
225        clean_optional(request.source_version),
226    )?;
227    let session_id_raw = require_non_empty(request.session_id, "session_id")?;
228    let session_id = redact_field_result(policy, &mut report, "session_id", &session_id_raw)?;
229    let event_type_raw = require_non_empty(request.event_type, "event_type")?;
230    let event_type = redact_field_result(policy, &mut report, "event_type", &event_type_raw)?;
231
232    let repo_id = redact_optional_result(
233        policy,
234        &mut report,
235        "repo_id",
236        clean_optional(request.repo_id),
237    )?;
238    let workspace_path_hash = redact_optional_result(
239        policy,
240        &mut report,
241        "workspace_path_hash",
242        clean_optional(request.workspace_path_hash).or_else(|| clean_optional(request.workspace)),
243    )?;
244    if repo_id.is_none() && workspace_path_hash.is_none() {
245        return Err(EngramError::InvalidInput(
246            "context_record requires repo_id or workspace_path_hash/workspace".to_string(),
247        ));
248    }
249
250    let command_for_analysis = clean_optional(request.command_name.clone())
251        .or_else(|| clean_optional(request.command.clone()));
252    let sensitive = policy.analyze_command(command_for_analysis.as_deref());
253    let command_name = redact_optional_result(
254        policy,
255        &mut report,
256        "command_name",
257        clean_optional(request.command_name).or_else(|| clean_optional(request.command)),
258    )?;
259    let tool_name = redact_optional_result(
260        policy,
261        &mut report,
262        "tool_name",
263        clean_optional(request.tool_name).or_else(|| clean_optional(request.tool)),
264    )?;
265
266    if event_type.eq_ignore_ascii_case("command") && command_name.is_none() {
267        return Err(EngramError::InvalidInput(
268            "context_record command events require command or command_name".to_string(),
269        ));
270    }
271    if event_type.eq_ignore_ascii_case("tool") && tool_name.is_none() {
272        return Err(EngramError::InvalidInput(
273            "context_record tool events require tool or tool_name".to_string(),
274        ));
275    }
276
277    let exit_code = optional_i32(request.exit_code, "exit_code")?;
278    let summary = redact_optional_result(
279        policy,
280        &mut report,
281        "summary",
282        clean_optional(request.summary),
283    )?;
284    let key_errors =
285        redact_string_list_result(policy, &mut report, "key_errors", &request.key_errors)?;
286    let touched_files =
287        redact_string_list_result(policy, &mut report, "touched_files", &request.touched_files)?;
288    let raw_artifact_id = redact_optional_result(
289        policy,
290        &mut report,
291        "raw_artifact_id",
292        clean_optional(request.raw_artifact_id),
293    )?;
294    let raw_pointer = redact_optional_result(
295        policy,
296        &mut report,
297        "raw_pointer",
298        clean_optional(request.raw_pointer.clone()).or_else(|| {
299            request
300                .reducer
301                .as_ref()
302                .and_then(|reducer| clean_optional(reducer.raw_pointer.clone()))
303        }),
304    )?;
305
306    let started_at = parse_datetime_or_now(request.started_at, "started_at")?;
307    let finished_at = parse_optional_datetime(request.finished_at, "finished_at")?;
308    let mut retention_policy =
309        clean_optional(request.retention_policy).unwrap_or_else(|| "default".to_string());
310    if policy.force_ephemeral(&sensitive) {
311        retention_policy = "ephemeral_sensitive".to_string();
312    }
313
314    let mut metadata = metadata_map(request.metadata);
315    let metadata_value =
316        redact_json_value(policy, &mut report, "metadata", Value::Object(metadata))?;
317    metadata = object_map(metadata_value);
318    insert_opt(&mut metadata, "source_version", source_version.clone());
319    insert_opt(&mut metadata, "raw_pointer", raw_pointer.clone());
320    if !key_errors.is_empty() {
321        metadata.insert("key_errors".to_string(), json!(key_errors));
322    }
323    if !touched_files.is_empty() {
324        metadata.insert("touched_files".to_string(), json!(touched_files));
325    }
326
327    let reducer = request.reducer.unwrap_or_default();
328    let external_reducer = redact_optional_result(
329        policy,
330        &mut report,
331        "external_reducer",
332        clean_optional(request.external_reducer)
333            .or_else(|| clean_optional(reducer.external_reducer.clone())),
334    )?;
335    let is_external = source.eq_ignore_ascii_case("rtk")
336        || external_reducer.is_some()
337        || raw_pointer.is_some()
338        || request.external_unverified.unwrap_or(false);
339    let external_unverified = request.external_unverified.unwrap_or(is_external);
340    let mut labels = normalized_labels(request.labels.into_iter().chain(reducer.labels.clone()));
341    if is_external {
342        push_label(&mut labels, "derived");
343        push_label(&mut labels, "lossy");
344        if external_unverified {
345            push_label(&mut labels, "external_unverified");
346        }
347    }
348    if !labels.is_empty() {
349        metadata.insert("labels".to_string(), json!(labels.clone()));
350    }
351    if is_external {
352        metadata.insert(
353            "external_summary".to_string(),
354            json!({
355                "source": source,
356                "source_version": source_version,
357                "external_reducer": external_reducer,
358                "raw_pointer": raw_pointer,
359                "labels": labels,
360                "external_unverified": external_unverified,
361                "pointer_dereferenced": false
362            }),
363        );
364    }
365
366    let observed_input_tokens_est = reducer
367        .tokens_raw_est
368        .or_else(|| Some(estimate_json_tokens(&Value::Object(metadata.clone()))));
369    let summary_tokens_est = summary.as_deref().map(estimate_tokens);
370    metadata.insert(
371        "metrics".to_string(),
372        metrics_value(observed_input_tokens_est, None, summary_tokens_est),
373    );
374    metadata.insert(
375        "redaction".to_string(),
376        report.to_value(policy, &sensitive, "raw_payload_not_accepted"),
377    );
378
379    let redaction_status = if report.has_redactions() {
380        "redacted"
381    } else {
382        "passed"
383    };
384    let event_metadata = Value::Object(metadata);
385    let event_id = create_context_event(
386        conn,
387        &NewContextEvent {
388            repo_id: repo_id.as_deref(),
389            workspace_path_hash: workspace_path_hash.as_deref(),
390            git_branch: clean_optional(request.git_branch).as_deref(),
391            worktree_name: clean_optional(request.worktree_name).as_deref(),
392            commit_hash: clean_optional(request.commit_hash).as_deref(),
393            session_id: &session_id,
394            task_id: clean_optional(request.task_id.clone()).as_deref(),
395            agent_id: clean_optional(request.agent_id.clone()).as_deref(),
396            source: &source,
397            event_type: &event_type,
398            command_name: command_name.as_deref(),
399            tool_name: tool_name.as_deref(),
400            cwd: clean_optional(request.cwd).as_deref(),
401            exit_code,
402            started_at,
403            finished_at,
404            redaction_status,
405            retention_policy: &retention_policy,
406            raw_artifact_id: raw_artifact_id.as_deref(),
407            raw_payload: None,
408            metadata: &event_metadata,
409        },
410    )?;
411    let event = load_event(conn, event_id)?;
412
413    let mut summary_id = None;
414    if let Some(summary_text) = summary.filter(|value| !value.trim().is_empty()) {
415        let confidence = reducer
416            .confidence
417            .unwrap_or(if is_external { 0.7 } else { 1.0 });
418        if !confidence.is_finite() || !(0.0..=1.0).contains(&confidence) {
419            return Err(EngramError::InvalidInput(
420                "reducer confidence must be between 0.0 and 1.0".to_string(),
421            ));
422        }
423
424        let mut structured = object_map(
425            reducer
426                .structured_facts
427                .map(|value| redact_json_value(policy, &mut report, "structured_facts", value))
428                .transpose()?
429                .unwrap_or_else(|| json!({})),
430        );
431        if let Some(files) = event.metadata.get("touched_files") {
432            structured.insert("touched_files".to_string(), files.clone());
433        }
434        if let Some(errors) = event.metadata.get("key_errors") {
435            structured.insert("key_errors".to_string(), errors.clone());
436        }
437        if let Some(external) = event.metadata.get("external_summary") {
438            structured.insert("external_summary".to_string(), external.clone());
439        }
440        if let Some(labels) = event.metadata.get("labels") {
441            structured.insert("labels".to_string(), labels.clone());
442        }
443
444        let mut warnings =
445            redact_string_list_result(policy, &mut report, "warnings", &reducer.warnings)?;
446        if external_unverified {
447            push_unique(&mut warnings, "external_unverified");
448        }
449        if raw_pointer.is_some() {
450            push_unique(&mut warnings, "raw_pointer_not_dereferenced");
451        }
452        let tokens_compact_est = reducer
453            .tokens_compact_est
454            .or_else(|| Some(estimate_tokens(&summary_text)));
455        let summary_row_id = create_context_summary(
456            conn,
457            &NewContextSummary {
458                source_event_id: event.id,
459                source_artifact_id: event.raw_artifact_id.as_deref(),
460                reducer_name: reducer_name(
461                    &source,
462                    external_reducer.as_deref(),
463                    reducer.name.as_deref(),
464                ),
465                reducer_version: reducer_version(
466                    source_version.as_deref(),
467                    reducer.version.as_deref(),
468                ),
469                lossy: if is_external {
470                    true
471                } else {
472                    reducer.lossy.unwrap_or(true)
473                },
474                confidence,
475                summary: &summary_text,
476                structured_facts: &Value::Object(structured),
477                warnings: &json!(warnings),
478                tokens_raw_est: reducer.tokens_raw_est,
479                tokens_compact_est,
480            },
481        )?;
482        summary_id = Some(load_summary_id(conn, summary_row_id)?);
483    }
484
485    Ok(ContextRecordResponse {
486        created_ids: ContextRecordCreatedIds {
487            event_id: event.id,
488            summary_id,
489            raw_artifact_id: event.raw_artifact_id.clone(),
490        },
491        redaction_status: redaction_status.to_string(),
492        retention_policy,
493        provenance: ProvenanceMetadata {
494            source,
495            source_version,
496            repo_id,
497            workspace_path_hash,
498            session_id: Some(session_id),
499            task_id: clean_optional(request.task_id),
500            agent_id: clean_optional(request.agent_id),
501            created_at: event.created_at,
502        },
503        metrics: ContextRecordMetrics {
504            estimated: true,
505            method: "chars_div_4_estimate_or_caller_supplied".to_string(),
506            observed_input_tokens_est,
507            summary_tokens_est,
508            stored_artifact_tokens_est: None,
509            estimated_savings_tokens: estimated_savings_tokens(
510                observed_input_tokens_est,
511                summary_tokens_est,
512            ),
513        },
514    })
515}
516
517pub fn record_context_artifact(
518    conn: &Connection,
519    policy: &OperationalContextPolicy,
520    request: ContextRecordArtifactRequest,
521) -> Result<ContextRecordArtifactResponse> {
522    let mut report = RedactionReport::new();
523    let kind_raw = require_non_empty(request.kind, "kind")?;
524    let kind = redact_field_result(policy, &mut report, "kind", &kind_raw)?;
525    let repo_id = redact_optional_result(
526        policy,
527        &mut report,
528        "repo_id",
529        clean_optional(request.repo_id),
530    )?;
531    let workspace_path_hash = redact_optional_result(
532        policy,
533        &mut report,
534        "workspace_path_hash",
535        clean_optional(request.workspace_path_hash).or_else(|| clean_optional(request.workspace)),
536    )?;
537    if request.source_event_id.is_none() && repo_id.is_none() && workspace_path_hash.is_none() {
538        return Err(EngramError::InvalidInput(
539            "context_record_artifact requires source_event_id, repo_id, or workspace_path_hash/workspace"
540                .to_string(),
541        ));
542    }
543
544    let mut metadata = metadata_map(request.metadata);
545    let command_hint = metadata
546        .get("command")
547        .and_then(Value::as_str)
548        .map(str::to_string)
549        .or_else(|| {
550            metadata
551                .get("command_name")
552                .and_then(Value::as_str)
553                .map(str::to_string)
554        });
555    let sensitive = policy.analyze_command(command_hint.as_deref());
556    let retain_raw_requested = request.retain_raw.unwrap_or(false);
557    if retain_raw_requested && request.raw_content.is_none() {
558        return Err(EngramError::InvalidInput(
559            "context_record_artifact retain_raw=true requires raw_content".to_string(),
560        ));
561    }
562    if request.raw_content.is_some() && !retain_raw_requested {
563        return Err(EngramError::InvalidInput(
564            "context_record_artifact raw_content requires retain_raw=true; pointer-only is the default"
565                .to_string(),
566        ));
567    }
568    if retain_raw_requested && !policy.allow_raw_for(&sensitive) {
569        return Err(EngramError::InvalidInput(
570            "context_record_artifact raw retention is blocked for sensitive command context"
571                .to_string(),
572        ));
573    }
574
575    let source = redact_optional_result(
576        policy,
577        &mut report,
578        "source",
579        clean_optional(request.source),
580    )?
581    .unwrap_or_else(|| "context_record_artifact".to_string());
582    let source_version = redact_optional_result(
583        policy,
584        &mut report,
585        "source_version",
586        clean_optional(request.source_version),
587    )?;
588    let raw_pointer = redact_optional_result(
589        policy,
590        &mut report,
591        "raw_pointer",
592        clean_optional(request.raw_pointer),
593    )?;
594    let uri = redact_optional_result(
595        policy,
596        &mut report,
597        "uri",
598        clean_optional(request.uri).or_else(|| raw_pointer.clone()),
599    )?;
600    let raw_content = request
601        .raw_content
602        .as_deref()
603        .map(|content| redact_field_result(policy, &mut report, "raw_content", content))
604        .transpose()?;
605    let observed_input_tokens_est = raw_content.as_deref().map(estimate_tokens);
606    let raw_bytes = raw_content.map(String::into_bytes);
607    let stored_artifact_tokens_est = raw_bytes
608        .as_ref()
609        .map(|bytes| estimate_tokens(&String::from_utf8_lossy(bytes)));
610    let metadata_value =
611        redact_json_value(policy, &mut report, "metadata", Value::Object(metadata))?;
612    metadata = object_map(metadata_value);
613    metadata.insert("source".to_string(), json!(source));
614    insert_opt(&mut metadata, "source_version", source_version.clone());
615    insert_opt(&mut metadata, "raw_pointer", raw_pointer.clone());
616    metadata.insert("pointer_dereferenced".to_string(), json!(false));
617    metadata.insert(
618        "metrics".to_string(),
619        metrics_value(observed_input_tokens_est, stored_artifact_tokens_est, None),
620    );
621    metadata.insert(
622        "redaction".to_string(),
623        report.to_value(
624            policy,
625            &sensitive,
626            if retain_raw_requested {
627                "raw_retained_after_redaction"
628            } else {
629                "pointer_only"
630            },
631        ),
632    );
633
634    let redaction_status = if report.has_redactions() {
635        ArtifactRedactionStatus::Redacted
636    } else if raw_bytes.is_some() {
637        ArtifactRedactionStatus::Passed
638    } else {
639        ArtifactRedactionStatus::NotRequired
640    };
641    let access_policy = clean_optional(request.access_policy)
642        .as_deref()
643        .map(ArtifactAccessPolicy::from_str)
644        .transpose()?
645        .unwrap_or_default();
646    let retention_policy_name = clean_optional(request.retention_policy).unwrap_or_else(|| {
647        if retain_raw_requested {
648            "raw_retained".to_string()
649        } else {
650            "pointer_only".to_string()
651        }
652    });
653    let artifact = create_context_artifact(
654        conn,
655        NewContextArtifact {
656            id: clean_optional(request.id),
657            source_event_id: request.source_event_id,
658            repo_id: repo_id.clone(),
659            workspace_path_hash: workspace_path_hash.clone(),
660            session_id: clean_optional(request.session_id.clone()),
661            task_id: clean_optional(request.task_id.clone()),
662            agent_id: clean_optional(request.agent_id.clone()),
663            kind,
664            label: redact_optional_result(
665                policy,
666                &mut report,
667                "label",
668                clean_optional(request.label),
669            )?,
670            uri,
671            media_type: redact_optional_result(
672                policy,
673                &mut report,
674                "media_type",
675                clean_optional(request.media_type),
676            )?,
677            content_sha256: clean_optional(request.content_sha256),
678            byte_len: request.byte_len,
679            raw_content: raw_bytes,
680            retention: ArtifactRetentionPolicy {
681                policy_name: retention_policy_name.clone(),
682                retain_raw: retain_raw_requested,
683                redaction_status,
684                ttl_seconds: request.ttl_seconds,
685                stale_after_seconds: request.stale_after_seconds,
686                access_policy,
687            },
688            metadata: Value::Object(metadata),
689        },
690    )?;
691
692    Ok(ContextRecordArtifactResponse {
693        artifact_id: artifact.id,
694        storage_kind: if artifact.retain_raw {
695            "raw_retained".to_string()
696        } else {
697            "pointer_only".to_string()
698        },
699        redaction_status: artifact.redaction_status.as_str().to_string(),
700        retention_policy: retention_policy_name,
701        provenance: ProvenanceMetadata {
702            source,
703            source_version,
704            repo_id,
705            workspace_path_hash,
706            session_id: clean_optional(request.session_id),
707            task_id: clean_optional(request.task_id),
708            agent_id: clean_optional(request.agent_id),
709            created_at: artifact.created_at,
710        },
711        metrics: ContextRecordMetrics {
712            estimated: true,
713            method: "chars_div_4_estimate_or_caller_supplied".to_string(),
714            observed_input_tokens_est,
715            summary_tokens_est: None,
716            stored_artifact_tokens_est,
717            estimated_savings_tokens: estimated_savings_tokens(
718                observed_input_tokens_est,
719                stored_artifact_tokens_est,
720            ),
721        },
722    })
723}
724
725fn reducer_name<'a>(
726    source: &'a str,
727    external_reducer: Option<&'a str>,
728    reducer_name: Option<&'a str>,
729) -> &'a str {
730    reducer_name
731        .and_then(non_empty)
732        .or_else(|| external_reducer.and_then(non_empty))
733        .unwrap_or(if source.eq_ignore_ascii_case("rtk") {
734            "rtk_external_summary"
735        } else {
736            "context_record"
737        })
738}
739
740fn reducer_version<'a>(
741    source_version: Option<&'a str>,
742    reducer_version: Option<&'a str>,
743) -> &'a str {
744    reducer_version
745        .and_then(non_empty)
746        .or_else(|| source_version.and_then(non_empty))
747        .unwrap_or("1")
748}
749
750fn non_empty(value: &str) -> Option<&str> {
751    if value.trim().is_empty() {
752        None
753    } else {
754        Some(value)
755    }
756}
757
758fn load_event(conn: &Connection, event_id: i64) -> Result<ContextEvent> {
759    get_context_event(conn, event_id)?
760        .ok_or_else(|| EngramError::Internal("context event insert was not readable".to_string()))
761}
762
763fn load_summary_id(conn: &Connection, summary_id: i64) -> Result<i64> {
764    get_context_summary(conn, summary_id)?
765        .map(|summary| summary.id)
766        .ok_or_else(|| EngramError::Internal("context summary insert was not readable".to_string()))
767}
768
769fn redact_field_result(
770    policy: &OperationalContextPolicy,
771    report: &mut RedactionReport,
772    field: &str,
773    value: &str,
774) -> Result<String> {
775    redact_field(policy, report, field, value).map_err(redaction_error)
776}
777
778fn redact_optional_result(
779    policy: &OperationalContextPolicy,
780    report: &mut RedactionReport,
781    field: &str,
782    value: Option<String>,
783) -> Result<Option<String>> {
784    redact_optional_field(policy, report, field, &value).map_err(redaction_error)
785}
786
787fn redact_string_list_result(
788    policy: &OperationalContextPolicy,
789    report: &mut RedactionReport,
790    field: &str,
791    values: &[String],
792) -> Result<Vec<String>> {
793    redact_string_list(policy, report, field, values).map_err(redaction_error)
794}
795
796fn redact_json_value(
797    policy: &OperationalContextPolicy,
798    report: &mut RedactionReport,
799    field: &str,
800    value: Value,
801) -> Result<Value> {
802    match value {
803        Value::String(value) => {
804            redact_field_result(policy, report, field, &value).map(Value::String)
805        }
806        Value::Array(values) => values
807            .into_iter()
808            .enumerate()
809            .map(|(idx, value)| {
810                redact_json_value(policy, report, &format!("{field}[{idx}]"), value)
811            })
812            .collect::<Result<Vec<_>>>()
813            .map(Value::Array),
814        Value::Object(values) => {
815            let mut output = Map::new();
816            for (key, value) in values {
817                let nested = format!("{field}.{key}");
818                if sensitive_key(&key) {
819                    report.record(
820                        &nested,
821                        &RedactedText {
822                            text: String::new(),
823                            redacted: true,
824                            classes: vec!["metadata_sensitive_key".to_string()],
825                        },
826                    );
827                    output.insert(
828                        key,
829                        Value::String("[REDACTED:metadata_sensitive_key]".to_string()),
830                    );
831                } else {
832                    output.insert(key, redact_json_value(policy, report, &nested, value)?);
833                }
834            }
835            Ok(Value::Object(output))
836        }
837        value => Ok(value),
838    }
839}
840
841fn metadata_map(metadata: Option<Value>) -> Map<String, Value> {
842    match metadata {
843        Some(Value::Object(map)) => map,
844        Some(value) => {
845            let mut map = Map::new();
846            map.insert("value".to_string(), value);
847            map
848        }
849        None => Map::new(),
850    }
851}
852
853fn object_map(value: Value) -> Map<String, Value> {
854    match value {
855        Value::Object(map) => map,
856        other => {
857            let mut map = Map::new();
858            map.insert("value".to_string(), other);
859            map
860        }
861    }
862}
863
864fn insert_opt(map: &mut Map<String, Value>, key: &str, value: Option<String>) {
865    if let Some(value) = value {
866        map.insert(key.to_string(), json!(value));
867    }
868}
869
870fn normalized_labels(values: impl Iterator<Item = String>) -> Vec<String> {
871    let mut labels = Vec::new();
872    for value in values {
873        let label = value.trim().to_ascii_lowercase();
874        if !label.is_empty() && !labels.iter().any(|existing| existing == &label) {
875            labels.push(label);
876        }
877    }
878    labels
879}
880
881fn push_label(labels: &mut Vec<String>, label: &str) {
882    if !labels.iter().any(|existing| existing == label) {
883        labels.push(label.to_string());
884    }
885}
886
887fn push_unique(values: &mut Vec<String>, value: &str) {
888    if !values.iter().any(|existing| existing == value) {
889        values.push(value.to_string());
890    }
891}
892
893fn require_non_empty(value: String, field: &str) -> Result<String> {
894    let value = value.trim().to_string();
895    if value.is_empty() {
896        Err(EngramError::InvalidInput(format!("{field} is required")))
897    } else {
898        Ok(value)
899    }
900}
901
902fn clean_optional(value: Option<String>) -> Option<String> {
903    value.and_then(|value| {
904        let trimmed = value.trim().to_string();
905        if trimmed.is_empty() {
906            None
907        } else {
908            Some(trimmed)
909        }
910    })
911}
912
913fn optional_i32(value: Option<i64>, field: &str) -> Result<Option<i32>> {
914    value
915        .map(|value| {
916            i32::try_from(value).map_err(|_| {
917                EngramError::InvalidInput(format!("{field} must fit in a 32-bit integer"))
918            })
919        })
920        .transpose()
921}
922
923fn parse_datetime_or_now(value: Option<String>, field: &str) -> Result<DateTime<Utc>> {
924    parse_optional_datetime(value, field).map(|value| value.unwrap_or_else(Utc::now))
925}
926
927fn parse_optional_datetime(value: Option<String>, field: &str) -> Result<Option<DateTime<Utc>>> {
928    let Some(value) = clean_optional(value) else {
929        return Ok(None);
930    };
931    DateTime::parse_from_rfc3339(&value)
932        .map(|dt| Some(dt.with_timezone(&Utc)))
933        .map_err(|err| EngramError::InvalidInput(format!("{field} must be RFC3339: {err}")))
934}
935
936fn sensitive_key(key: &str) -> bool {
937    let lower = key.to_ascii_lowercase();
938    lower.contains("password")
939        || lower.contains("token")
940        || lower.contains("secret")
941        || lower.contains("api_key")
942        || lower.contains("apikey")
943        || lower.contains("authorization")
944        || lower.contains("cookie")
945}
946
947fn redaction_error(err: impl std::fmt::Display) -> EngramError {
948    EngramError::InvalidInput(format!("operational context redaction failed: {err}"))
949}