Storage

Trait Storage 

Source
pub trait Storage {
Show 18 methods // Required methods fn insert(&self, event: &Event) -> Result<i64, StorageError>; fn query(&self, query: &EventQuery) -> Result<Vec<Event>, StorageError>; fn gc(&self, retention: Duration) -> Result<usize, StorageError>; fn count_expired(&self, retention: Duration) -> Result<usize, StorageError>; fn count(&self) -> Result<usize, StorageError>; fn list_sessions( &self, query: &SessionQuery, ) -> Result<Vec<Session>, StorageError>; fn get_session( &self, session_id: &str, ) -> Result<Option<Session>, StorageError>; fn get_session_by_key( &self, machine_id: &str, session_id: &str, ) -> Result<Option<Session>, StorageError>; fn get_session_by_pid( &self, pid: i32, ) -> Result<Option<Session>, StorageError>; fn update_session_git_info( &self, session_id: &str, git_info: &GitBranchInfo, ) -> Result<bool, StorageError>; fn update_session_github_repo( &self, session_id: &str, github_repo: &str, ) -> Result<bool, StorageError>; fn upsert_session_git_context( &self, session_id: &str, machine_id: &str, framework: &str, timestamp: i64, local_git_dir: Option<&str>, github_repo: Option<&str>, ) -> Result<(), StorageError>; fn update_session_transcript_path( &self, machine_id: &str, session_id: &str, transcript_path: &str, ) -> Result<bool, StorageError>; fn storage_stats( &self, query: &StorageStatsQuery, ) -> Result<StorageStats, StorageError>; // Provided methods fn get_transcript_position( &self, _path: &Path, ) -> Result<Option<FilePosition>, StorageError> { ... } fn set_transcript_position( &self, _path: &Path, _position: &FilePosition, ) -> Result<(), StorageError> { ... } fn event_exists_by_uuid( &self, _session_id: &str, _uuid: &str, ) -> Result<bool, StorageError> { ... } fn query_transcript_positions( &self, ) -> Result<Vec<(String, FilePosition)>, StorageError> { ... }
}
Expand description

Storage backend trait for mi6 events

All events (including API requests) are stored in a unified events table. API requests are distinguished by having the ApiRequest event type and populated token fields.

Required Methods§

Source

fn insert(&self, event: &Event) -> Result<i64, StorageError>

Insert a new event, returns the event ID

Source

fn query(&self, query: &EventQuery) -> Result<Vec<Event>, StorageError>

Query events using the composable EventQuery builder.

This is the primary method for retrieving events. Use EventQuery to specify filters, ordering, and pagination.

§Ordering
  • Use order_by_timestamp() for recent events (default, descending)
  • Use order_by_id() for watching new events (ascending by default)
§API Requests

Use api_requests_only() to filter to only API request events, or exclude_api_requests() to exclude them.

§Examples
// Get 50 most recent events (including API requests)
let events = storage.query(&EventQuery::new().order_by_timestamp().with_limit(50))?;

// Get events for a specific session
let events = storage.query(&EventQuery::new().with_session(Some("sess-123".to_string())).with_limit(100))?;

// Watch for new events after a known ID
let events = storage.query(&EventQuery::new().order_by_id().with_after_id(last_id).with_limit(10))?;

// Get only API request events
let api_events = storage.query(&EventQuery::new().api_requests_only().with_limit(50))?;
Source

fn gc(&self, retention: Duration) -> Result<usize, StorageError>

Delete events older than the retention period, returns count deleted.

Source

fn count_expired(&self, retention: Duration) -> Result<usize, StorageError>

Count events older than the retention period (for dry-run).

Source

fn count(&self) -> Result<usize, StorageError>

Count total events

Source

fn list_sessions( &self, query: &SessionQuery, ) -> Result<Vec<Session>, StorageError>

List sessions matching the query criteria.

Sessions contain denormalized metadata that is incrementally updated when events are inserted. This enables single-query monitoring by returning all session data in one call.

§Examples
// Get all active sessions, most recent first
let sessions = storage.list_sessions(&SessionQuery::new().active_only())?;

// Get sessions for a specific framework
let sessions = storage.list_sessions(&SessionQuery::new().with_framework("claude"))?;

// Get 10 most recently active sessions
let sessions = storage.list_sessions(
    &SessionQuery::new()
        .with_order_by(SessionOrder::LastActivity)
        .with_limit(10)
)?;
Source

fn get_session(&self, session_id: &str) -> Result<Option<Session>, StorageError>

Get a single session by ID.

This queries by session_id alone. If multiple sessions exist with the same session_id (on different machines), returns the most recently active one.

For exact lookups with a known machine_id, use [get_session_by_key].

Returns Ok(None) if no session with the given ID exists.

Source

fn get_session_by_key( &self, machine_id: &str, session_id: &str, ) -> Result<Option<Session>, StorageError>

Get a single session by composite key (machine_id, session_id).

This is the exact lookup using the composite primary key, which ensures uniqueness across multiple machines.

Returns Ok(None) if no session with the given key exists.

Source

fn get_session_by_pid(&self, pid: i32) -> Result<Option<Session>, StorageError>

Get a session by its process ID.

Returns the most recently active session associated with the given PID. This is useful for looking up sessions when only the PID is known (e.g., from process monitoring tools).

Returns Ok(None) if no session with the given PID exists.

Source

fn update_session_git_info( &self, session_id: &str, git_info: &GitBranchInfo, ) -> Result<bool, StorageError>

Update git branch information for a session.

This updates the session’s git_branch, git_pr_number, and git_issue_number fields. It is typically called:

  • On SessionStart, to capture the initial branch
  • After detecting a git branch-changing command in PostToolUse

Returns Ok(false) if the session doesn’t exist.

Source

fn update_session_github_repo( &self, session_id: &str, github_repo: &str, ) -> Result<bool, StorageError>

Update the GitHub repository for a session.

This updates the session’s github_repo field if it’s currently empty. Uses COALESCE semantics to avoid overwriting existing values.

It is typically called:

  • On SessionStart, to capture the initial repo from git remote
  • When a worktree is detected

Returns Ok(false) if the session doesn’t exist.

Source

fn upsert_session_git_context( &self, session_id: &str, machine_id: &str, framework: &str, timestamp: i64, local_git_dir: Option<&str>, github_repo: Option<&str>, ) -> Result<(), StorageError>

Upsert git directory path and GitHub repository for a session.

This is called on every hook event BEFORE the event is inserted, to ensure git context is updated first. This allows branch parsing (which happens during event insert) to correctly set github_issue/github_pr for the current repo.

When github_repo changes, github_issue and github_pr are cleared because issue/PR numbers are only meaningful in the context of a specific repository.

Creates the session if it doesn’t exist (UPSERT semantics).

This is safe to call frequently (~50µs with direct file access).

Source

fn update_session_transcript_path( &self, machine_id: &str, session_id: &str, transcript_path: &str, ) -> Result<bool, StorageError>

Update the transcript path for a session.

This updates the session’s transcript_path field if it’s currently empty. Uses COALESCE semantics to avoid overwriting existing values.

This is primarily used for Codex sessions where the transcript path is not available from hooks but is known from session file scanning.

Returns Ok(false) if the session doesn’t exist.

Source

fn storage_stats( &self, query: &StorageStatsQuery, ) -> Result<StorageStats, StorageError>

Get aggregated statistics across all sessions.

Returns session counts, token totals, costs, and API request counts computed at the database level via a single aggregation query. This is more efficient than iterating over individual sessions in application code.

§Examples
// Get stats for all sessions
let stats = storage.storage_stats(&StorageStatsQuery::new())?;
println!("Total sessions: {}", stats.session_count);
println!("Active sessions: {}", stats.active_session_count);
println!("Total cost: ${:.2}", stats.total_cost_usd);

// Get stats for only active sessions
let stats = storage.storage_stats(&StorageStatsQuery::new().active_only())?;

// Get stats for a specific framework
let stats = storage.storage_stats(&StorageStatsQuery::new().with_framework("claude"))?;

Provided Methods§

Source

fn get_transcript_position( &self, _path: &Path, ) -> Result<Option<FilePosition>, StorageError>

Get the last scanned position for a transcript file.

Returns Ok(None) if the file has never been scanned. Default implementation returns Ok(None).

Source

fn set_transcript_position( &self, _path: &Path, _position: &FilePosition, ) -> Result<(), StorageError>

Set the scanned position for a transcript file.

Default implementation does nothing.

Source

fn event_exists_by_uuid( &self, _session_id: &str, _uuid: &str, ) -> Result<bool, StorageError>

Check if an event with the given UUID already exists for a session.

Used for deduplication when scanning transcripts. Default implementation returns Ok(false).

Source

fn query_transcript_positions( &self, ) -> Result<Vec<(String, FilePosition)>, StorageError>

Query all transcript file positions.

Returns a list of (path, position) tuples for all tracked transcript files. Default implementation returns an empty list.

Implementations on Foreign Types§

Source§

impl<T: Storage> Storage for Arc<T>

Blanket implementation of Storage for Arc<T> where T: Storage.

This enables shared ownership of storage instances. Wrap your storage in an Arc and it can be used anywhere a Storage implementation is expected.

§Use Cases

  • Sharing storage across multiple owned structs
  • Single-threaded async runtimes (e.g., tokio::task::LocalSet)
  • Multi-threaded access when T: Send + Sync (see note below)

§Thread Safety Note

For multi-threaded use, T must be Send + Sync. SqliteStorage is not Sync (since rusqlite::Connection is not Sync), so Arc<SqliteStorage> cannot be shared across threads directly. For multi-threaded scenarios with SqliteStorage, use Arc<Mutex<SqliteStorage>> or a connection pool.

§Example

use mi6_storage_sqlite::SqliteStorage;
use mi6_core::{Storage, EventQuery};
use std::sync::Arc;

// Create shared storage
let storage = Arc::new(SqliteStorage::open(path)?);

// Clone for use in multiple places (single-threaded)
let storage_for_queries = Arc::clone(&storage);
let storage_for_inserts = Arc::clone(&storage);

// Both references can use Storage trait methods
let events = storage_for_queries.query(&EventQuery::new().with_limit(10))?;
storage_for_inserts.insert(&event)?;
Source§

fn insert(&self, event: &Event) -> Result<i64, StorageError>

Source§

fn query(&self, query: &EventQuery) -> Result<Vec<Event>, StorageError>

Source§

fn gc(&self, retention: Duration) -> Result<usize, StorageError>

Source§

fn count_expired(&self, retention: Duration) -> Result<usize, StorageError>

Source§

fn count(&self) -> Result<usize, StorageError>

Source§

fn list_sessions( &self, query: &SessionQuery, ) -> Result<Vec<Session>, StorageError>

Source§

fn get_session(&self, session_id: &str) -> Result<Option<Session>, StorageError>

Source§

fn get_session_by_key( &self, machine_id: &str, session_id: &str, ) -> Result<Option<Session>, StorageError>

Source§

fn get_session_by_pid(&self, pid: i32) -> Result<Option<Session>, StorageError>

Source§

fn update_session_git_info( &self, session_id: &str, git_info: &GitBranchInfo, ) -> Result<bool, StorageError>

Source§

fn update_session_github_repo( &self, session_id: &str, github_repo: &str, ) -> Result<bool, StorageError>

Source§

fn upsert_session_git_context( &self, session_id: &str, machine_id: &str, framework: &str, timestamp: i64, local_git_dir: Option<&str>, github_repo: Option<&str>, ) -> Result<(), StorageError>

Source§

fn update_session_transcript_path( &self, machine_id: &str, session_id: &str, transcript_path: &str, ) -> Result<bool, StorageError>

Source§

fn storage_stats( &self, query: &StorageStatsQuery, ) -> Result<StorageStats, StorageError>

Source§

fn get_transcript_position( &self, path: &Path, ) -> Result<Option<FilePosition>, StorageError>

Source§

fn set_transcript_position( &self, path: &Path, position: &FilePosition, ) -> Result<(), StorageError>

Source§

fn event_exists_by_uuid( &self, session_id: &str, uuid: &str, ) -> Result<bool, StorageError>

Source§

fn query_transcript_positions( &self, ) -> Result<Vec<(String, FilePosition)>, StorageError>

Implementors§