oxidized-state 0.3.1

SurrealDB backend for AIVCS state persistence (Layer 0)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
//! SurrealDB schema migrations and initialization
//!
//! This module provides initialization functions to set up all tables
//! with proper constraints, indexes, and ACID guarantees.

use crate::Result;
use surrealdb::engine::any::Any;
use surrealdb::Surreal;
use tracing::{debug, info};

/// Initialize all AIVCS tables in SurrealDB
///
/// This should be called once on first connection to set up the schema.
/// Safe to call multiple times (idempotent).
pub async fn init_schema(db: &Surreal<Any>) -> Result<()> {
    info!("Initializing AIVCS SurrealDB schema");

    // Core VCS tables
    init_commits_table(db).await?;
    init_snapshots_table(db).await?;
    init_branches_table(db).await?;
    init_graph_edges_table(db).await?;
    init_memories_table(db).await?;
    init_agents_table(db).await?;

    // Run Ledger tables
    init_runs_table(db).await?;
    init_run_events_table(db).await?;

    // Release Registry tables
    init_releases_table(db).await?;

    // CI tables
    init_ci_tables(db).await?;

    // Memory and Decision tables (EPIC5)
    init_decisions_table(db).await?;
    init_memory_provenances_table(db).await?;

    info!("AIVCS schema initialization complete");
    Ok(())
}

/// Initialize `runs` table with constraints and indexes
///
/// Schema:
/// ```text
/// TABLE runs {
///   run_id:              STRING (primary key, unique)
///   spec_digest:         STRING (indexed)
///   git_sha:             STRING? (optional, indexed)
///   agent_name:          STRING (indexed)
///   tags:                OBJECT
///   status:              STRING (enum: RUNNING | COMPLETED | FAILED | CANCELLED)
///   total_events:        INT
///   final_state_digest:  STRING?
///   duration_ms:         INT
///   success:             BOOL
///   created_at:          DATETIME (indexed)
///   completed_at:        DATETIME?
/// }
/// ```
///
/// Constraints:
/// - `run_id` is unique (prevents duplicate runs)
/// - `status` must be one of: "RUNNING", "COMPLETED", "FAILED", "CANCELLED"
/// - `status` transitions: RUNNING → COMPLETED | FAILED | CANCELLED (enforced via app logic)
/// - Completed runs are immutable (enforced via app logic)
async fn init_runs_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing runs table");

    // Create table with constraints
    let sql = r#"
        DEFINE TABLE runs SCHEMALESS
            PERMISSIONS
                FOR create FULL
                FOR select FULL
                FOR update FULL
                FOR delete NONE;

        -- Ensure run_id is unique
        DEFINE INDEX idx_run_id ON TABLE runs COLUMNS run_id UNIQUE;

        -- Index spec_digest for listing runs by agent version
        DEFINE INDEX idx_spec_digest ON TABLE runs COLUMNS spec_digest;

        -- Index agent_name for finding runs by agent
        DEFINE INDEX idx_agent_name ON TABLE runs COLUMNS agent_name;

        -- Index git_sha for correlating runs with git commits
        DEFINE INDEX idx_git_sha ON TABLE runs COLUMNS git_sha;

        -- Index created_at for time-range queries
        DEFINE INDEX idx_created_at ON TABLE runs COLUMNS created_at;

        -- Composite index (spec_digest, created_at) for fast agent version history
        DEFINE INDEX idx_spec_digest_created_at ON TABLE runs COLUMNS spec_digest, created_at;

        -- Composite index (run_id, status) for state queries
        DEFINE INDEX idx_run_id_status ON TABLE runs COLUMNS run_id, status;
    "#;

    db.query(sql).await?;
    info!("✓ runs table initialized");
    Ok(())
}

/// Initialize `run_events` table with constraints and indexes
///
/// Schema:
/// ```text
/// TABLE run_events {
///   run_id:     STRING (foreign key to runs.run_id)
///   seq:        INT (monotonic sequence within run)
///   kind:       STRING (event type)
///   payload:    OBJECT (event data)
///   timestamp:  DATETIME
/// }
/// ```
///
/// Constraints:
/// - `(run_id, seq)` is unique and clustered (prevents duplicate seq)
/// - `seq` is 1-indexed and monotonically increasing within a run
/// - Enforced via application logic during append_event()
async fn init_run_events_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing run_events table");

    let sql = r#"
        DEFINE TABLE run_events SCHEMALESS
            PERMISSIONS
                FOR create FULL
                FOR select FULL
                FOR update NONE
                FOR delete NONE;

        -- Composite unique index: (run_id, seq) ensures no duplicate sequences per run
        -- This is the most critical constraint for event ordering
        DEFINE INDEX idx_run_id_seq ON TABLE run_events COLUMNS run_id, seq UNIQUE;

        -- Index run_id for fast event retrieval by run
        DEFINE INDEX idx_run_id ON TABLE run_events COLUMNS run_id;

        -- Index (run_id, timestamp) for time-ordered queries
        DEFINE INDEX idx_run_id_timestamp ON TABLE run_events COLUMNS run_id, timestamp;

        -- Index event kind for filtering by event type
        DEFINE INDEX idx_kind ON TABLE run_events COLUMNS kind;

        -- Composite index (run_id, seq, timestamp) for sorted event retrieval
        DEFINE INDEX idx_run_id_seq_timestamp ON TABLE run_events COLUMNS run_id, seq, timestamp;
    "#;

    db.query(sql).await?;
    info!("✓ run_events table initialized");
    Ok(())
}

/// Initialize `releases` table with constraints and indexes
///
/// Schema:
/// ```text
/// TABLE releases {
///   agent_name:     STRING (part of uniqueness constraint)
///   spec_digest:    STRING
///   version_label:  STRING? (optional semantic version)
///   promoted_by:    STRING (who promoted this release)
///   notes:          STRING? (release notes)
///   created_at:     DATETIME (unique per agent+time)
/// }
/// ```
///
/// Semantics:
/// - Release history is append-only (new release entry for rollback)
/// - Most recent release (by created_at) is "current"
/// - Uniqueness enforced at application layer (can have same spec_digest multiple times)
async fn init_releases_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing releases table");

    let sql = r#"
        DEFINE TABLE releases SCHEMAFULL;
        DEFINE FIELD name ON releases TYPE string;
        DEFINE FIELD spec_digest ON releases TYPE string;
        DEFINE FIELD metadata ON releases FLEXIBLE TYPE object;
        DEFINE FIELD version_label ON releases TYPE option<string>;
        DEFINE FIELD promoted_by ON releases TYPE option<string>;
        DEFINE FIELD notes ON releases TYPE option<string>;
        DEFINE FIELD created_at ON releases TYPE datetime;

        DEFINE INDEX idx_release_name ON releases FIELDS name;
        DEFINE INDEX idx_release_name_created_at ON releases FIELDS name, created_at;
        DEFINE INDEX idx_spec_digest ON releases FIELDS spec_digest;
    "#;

    db.query(sql).await?;
    info!("✓ releases table initialized");
    Ok(())
}

/// Initialize `commits` table
async fn init_commits_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing commits table");

    let sql = r#"
        DEFINE TABLE commits SCHEMAFULL;
        DEFINE FIELD commit_id ON commits TYPE object;
        DEFINE FIELD commit_id.hash ON commits TYPE string;
        DEFINE FIELD commit_id.logic_hash ON commits TYPE option<string>;
        DEFINE FIELD commit_id.state_hash ON commits TYPE string;
        DEFINE FIELD commit_id.env_hash ON commits TYPE option<string>;
        DEFINE FIELD parent_ids ON commits TYPE array<string>;
        DEFINE FIELD message ON commits TYPE string;
        DEFINE FIELD author ON commits TYPE string;
        DEFINE FIELD created_at ON commits TYPE datetime;
        DEFINE FIELD branch ON commits TYPE option<string>;
        DEFINE INDEX idx_commit_hash ON commits FIELDS commit_id.hash UNIQUE;
        DEFINE INDEX idx_author ON commits FIELDS author;
        DEFINE INDEX idx_branch ON commits FIELDS branch;
    "#;

    db.query(sql).await?;
    info!("✓ commits table initialized");
    Ok(())
}

/// Initialize `snapshots` table
async fn init_snapshots_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing snapshots table");

    let sql = r#"
        DEFINE TABLE snapshots SCHEMAFULL;
        DEFINE FIELD commit_id ON snapshots TYPE string;
        DEFINE FIELD state ON snapshots FLEXIBLE TYPE object;
        DEFINE FIELD size_bytes ON snapshots TYPE int;
        DEFINE FIELD created_at ON snapshots TYPE datetime;
        DEFINE INDEX idx_snapshot_commit ON snapshots FIELDS commit_id UNIQUE;
    "#;

    db.query(sql).await?;
    info!("✓ snapshots table initialized");
    Ok(())
}

/// Initialize `branches` table
async fn init_branches_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing branches table");

    let sql = r#"
        DEFINE TABLE branches SCHEMAFULL;
        DEFINE FIELD name ON branches TYPE string;
        DEFINE FIELD head_commit_id ON branches TYPE string;
        DEFINE FIELD is_default ON branches TYPE bool;
        DEFINE FIELD created_at ON branches TYPE datetime;
        DEFINE FIELD updated_at ON branches TYPE datetime;
        DEFINE INDEX idx_branch_name ON branches FIELDS name UNIQUE;
    "#;

    db.query(sql).await?;
    info!("✓ branches table initialized");
    Ok(())
}

/// Initialize `graph_edges` table
async fn init_graph_edges_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing graph_edges table");

    let sql = r#"
        DEFINE TABLE graph_edges SCHEMAFULL;
        DEFINE FIELD child_id ON graph_edges TYPE string;
        DEFINE FIELD parent_id ON graph_edges TYPE string;
        DEFINE FIELD edge_type ON graph_edges TYPE string;
        DEFINE FIELD created_at ON graph_edges TYPE datetime;
        DEFINE INDEX idx_edge_child ON graph_edges FIELDS child_id;
        DEFINE INDEX idx_edge_parent ON graph_edges FIELDS parent_id;
    "#;

    db.query(sql).await?;
    info!("✓ graph_edges table initialized");
    Ok(())
}

/// Initialize `memories` table
async fn init_memories_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing memories table");

    let sql = r#"
        DEFINE TABLE memories SCHEMAFULL;
        DEFINE FIELD commit_id ON memories TYPE string;
        DEFINE FIELD key ON memories TYPE string;
        DEFINE FIELD content ON memories TYPE string;
        DEFINE FIELD embedding ON memories TYPE option<array>;
        DEFINE FIELD metadata ON memories FLEXIBLE TYPE object;
        DEFINE FIELD created_at ON memories TYPE datetime;
        DEFINE INDEX idx_memory_commit ON memories FIELDS commit_id;
    "#;

    db.query(sql).await?;
    info!("✓ memories table initialized");
    Ok(())
}

/// Initialize `agents` table
async fn init_agents_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing agents table");

    let sql = r#"
        DEFINE TABLE agents SCHEMAFULL;
        DEFINE FIELD agent_id ON agents TYPE string;
        DEFINE FIELD name ON agents TYPE string;
        DEFINE FIELD agent_type ON agents TYPE string;
        DEFINE FIELD config ON agents FLEXIBLE TYPE object;
        DEFINE FIELD created_at ON agents TYPE datetime;
        DEFINE INDEX idx_agent_id ON agents FIELDS agent_id UNIQUE;
    "#;

    db.query(sql).await?;
    info!("✓ agents table initialized");
    Ok(())
}

/// Initialize CI related tables
async fn init_ci_tables(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing CI tables");

    let sql = r#"
        -- CI snapshot table (content-addressed by digest)
        DEFINE TABLE ci_snapshots SCHEMAFULL;
        DEFINE FIELD digest ON ci_snapshots TYPE string;
        DEFINE FIELD snapshot_json ON ci_snapshots TYPE string;
        DEFINE INDEX idx_ci_snapshot_digest ON ci_snapshots FIELDS digest UNIQUE;

        -- CI pipeline table (content-addressed by digest)
        DEFINE TABLE ci_pipelines SCHEMAFULL;
        DEFINE FIELD digest ON ci_pipelines TYPE string;
        DEFINE FIELD pipeline_json ON ci_pipelines TYPE string;
        DEFINE INDEX idx_ci_pipeline_digest ON ci_pipelines FIELDS digest UNIQUE;

        -- CI run table (linked by run_id and digests)
        DEFINE TABLE ci_runs SCHEMAFULL;
        DEFINE FIELD run_id ON ci_runs TYPE string;
        DEFINE FIELD snapshot_digest ON ci_runs TYPE string;
        DEFINE FIELD pipeline_digest ON ci_runs TYPE string;
        DEFINE FIELD status ON ci_runs TYPE string;
        DEFINE FIELD run_json ON ci_runs TYPE string;
        DEFINE FIELD started_at ON ci_runs TYPE option<string>;
        DEFINE FIELD finished_at ON ci_runs TYPE option<string>;
        DEFINE INDEX idx_ci_run_id ON ci_runs FIELDS run_id UNIQUE;
        DEFINE INDEX idx_ci_run_snapshot ON ci_runs FIELDS snapshot_digest;
    "#;

    db.query(sql).await?;
    info!("✓ CI tables initialized");
    Ok(())
}

/// Initialize `decisions` table (EPIC5)
///
/// Schema:
/// ```text
/// TABLE decisions {
///   decision_id:    STRING (primary key)
///   commit_id:      STRING (indexed)
///   task:           STRING
///   action:         STRING
///   rationale:      STRING
///   alternatives:   ARRAY<STRING>
///   confidence:     FLOAT (0.0-1.0)
///   outcome:        STRING? (JSON serialized outcome)
///   timestamp:      DATETIME (indexed)
///   outcome_at:     DATETIME?
/// }
/// ```
async fn init_decisions_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing decisions table");

    let sql = r#"
        DEFINE TABLE decisions SCHEMAFULL;
        DEFINE FIELD decision_id ON decisions TYPE string;
        DEFINE FIELD commit_id ON decisions TYPE string;
        DEFINE FIELD task ON decisions TYPE string;
        DEFINE FIELD action ON decisions TYPE string;
        DEFINE FIELD rationale ON decisions TYPE string;
        DEFINE FIELD alternatives ON decisions TYPE array;
        DEFINE FIELD confidence ON decisions TYPE float;
        DEFINE FIELD outcome ON decisions TYPE option<string>;
        DEFINE FIELD timestamp ON decisions TYPE datetime;
        DEFINE FIELD outcome_at ON decisions TYPE option<datetime>;

        DEFINE INDEX idx_decision_id ON decisions FIELDS decision_id UNIQUE;
        DEFINE INDEX idx_decision_commit ON decisions FIELDS commit_id;
        DEFINE INDEX idx_decision_task ON decisions FIELDS task;
        DEFINE INDEX idx_decision_timestamp ON decisions FIELDS timestamp;
        DEFINE INDEX idx_decision_commit_task ON decisions FIELDS commit_id, task;
    "#;

    db.query(sql).await?;
    info!("✓ decisions table initialized");
    Ok(())
}

/// Initialize `memory_provenances` table (EPIC5)
///
/// Schema:
/// ```text
/// TABLE memory_provenances {
///   memory_id:       STRING (indexed)
///   source_type:     STRING (run_trace | state_snapshot | user_annotation | memory_derivation)
///   source_data:     OBJECT (variant-specific fields)
///   derived_from:    STRING? (parent memory_id)
///   created_at:      DATETIME (indexed)
///   invalidated_at:  DATETIME?
/// }
/// ```
async fn init_memory_provenances_table(db: &Surreal<Any>) -> Result<()> {
    debug!("Initializing memory_provenances table");

    let sql = r#"
        DEFINE TABLE memory_provenances SCHEMAFULL;
        DEFINE FIELD memory_id ON memory_provenances TYPE string;
        DEFINE FIELD source_type ON memory_provenances TYPE string;
        DEFINE FIELD source_data ON memory_provenances FLEXIBLE TYPE object;
        DEFINE FIELD derived_from ON memory_provenances TYPE option<string>;
        DEFINE FIELD created_at ON memory_provenances TYPE datetime;
        DEFINE FIELD invalidated_at ON memory_provenances TYPE option<datetime>;

        DEFINE INDEX idx_provenance_memory_id ON memory_provenances FIELDS memory_id;
        DEFINE INDEX idx_provenance_created_at ON memory_provenances FIELDS created_at;
        DEFINE INDEX idx_provenance_derived_from ON memory_provenances FIELDS derived_from;
        DEFINE INDEX idx_provenance_source_type ON memory_provenances FIELDS source_type;
        DEFINE INDEX idx_provenance_invalidated ON memory_provenances FIELDS invalidated_at;
    "#;

    db.query(sql).await?;
    info!("✓ memory_provenances table initialized");
    Ok(())
}

#[cfg(test)]
mod tests {
    // Note: Full integration tests for migrations are in oxidized-state/tests/
    // These tests verify actual schema creation and constraints
}