data-connector 2.2.0

Storage backends for conversations and responses
Documentation

data-connector

Version: 2.0.0 | License: Apache-2.0

Pluggable storage abstraction for the Shepherd Model Gateway (SMG). Provides trait-based backends for persisting conversations, conversation items, and responses. Backend selection is a runtime decision driven by configuration, allowing the same application binary to target in-memory storage during development and a production database in deployment.

Architecture

Core Traits

The crate defines three async traits in core.rs. Every backend implements all three:

Trait Responsibility
ConversationStorage CRUD for conversation records (create, get, update, delete)
ConversationItemStorage Item creation, linking items to conversations, cursor-based listing, deletion of links
ResponseStorage Store/retrieve responses, walk response chains, index and query by safety identifier

All traits are Send + Sync + 'static and use async_trait so they can be held behind Arc<dyn Trait>.

Factory Pattern

create_storage() is the single entry point. It accepts a StorageFactoryConfig (backend selector plus optional per-backend configs) and returns a StorageTuple:

pub type StorageTuple = (
    Arc<dyn ResponseStorage>,
    Arc<dyn ConversationStorage>,
    Arc<dyn ConversationItemStorage>,
);

Supported Backends

Backend Variant Description
Memory HistoryBackend::Memory In-process HashMap/BTreeMap storage. Default. No persistence across restarts. Suitable for development and testing.
None / NoOp HistoryBackend::None Accepts all writes silently, returns empty on reads. Use when persistence is intentionally disabled.
PostgreSQL HistoryBackend::Postgres Production backend using tokio-postgres with deadpool connection pooling. Fully async. Tables are auto-created on first connection.
Redis HistoryBackend::Redis Production backend using deadpool-redis. Supports optional TTL-based data retention (retention_days).
Oracle ATP HistoryBackend::Oracle Enterprise backend using the synchronous oracle crate. Async bridging is handled via tokio::task::spawn_blocking. Tables are auto-created on initialization.

Usage

use data_connector::{
    create_storage, StorageFactoryConfig, HistoryBackend,
    NewConversation, NewConversationItem, StoredResponse,
};
use serde_json::json;
use std::sync::Arc;

// Build factory config -- memory backend needs no extra configuration.
let config = StorageFactoryConfig {
    backend: &HistoryBackend::Memory,
    oracle: None,
    postgres: None,
    redis: None,
    hook: None,
};

let (responses, conversations, items) = create_storage(config).await.unwrap();

// Create a conversation
let conv = conversations
    .create_conversation(NewConversation { id: None, metadata: None })
    .await
    .unwrap();

// Store a response
let mut resp = StoredResponse::new(None);
resp.input = json!([{"role": "user", "content": "Hello"}]);
let resp_id = responses.store_response(resp).await.unwrap();

// Create and link a conversation item
let item = items
    .create_item(NewConversationItem {
        id: None,
        response_id: Some(resp_id.0.clone()),
        item_type: "message".to_string(),
        role: Some("user".to_string()),
        content: json!([]),
        status: Some("completed".to_string()),
    })
    .await
    .unwrap();

items.link_item(&conv.id, &item.id, chrono::Utc::now()).await.unwrap();

Configuration

Backend selection is controlled by the HistoryBackend enum ("memory", "none", "oracle", "postgres", "redis" when deserialized from JSON/YAML). Each database backend has a dedicated config struct.

PostgresConfig

Field Type Description
db_url String Connection URL (postgres://user:pass@host:port/dbname). Validated for scheme, host, and database name.
pool_max usize Maximum connections in the deadpool pool (default helper: 16). Must be > 0.
schema Option<SchemaConfig> Optional schema customization. See Schema Configuration.

Call validate() to check the URL before use.

RedisConfig

Field Type Default Description
url String -- Connection URL (redis:// or rediss://).
pool_max usize 16 Maximum pool connections.
retention_days Option<u64> Some(30) TTL in days for stored data. None disables expiration.
schema Option<SchemaConfig> None Optional schema customization. See Schema Configuration.

Call validate() to check the URL before use.

OracleConfig

Field Type Default Description
wallet_path Option<String> None Path to ATP wallet / TLS config directory.
connect_descriptor String -- DSN, e.g. tcps://host:port/service.
external_auth bool false Use OS/external authentication.
username String -- Database username.
password String -- Database password (redacted in Debug output).
pool_min usize 1 Minimum pool connections.
pool_max usize 16 Maximum pool connections.
pool_timeout_secs u64 30 Connection acquisition timeout in seconds.
schema Option<SchemaConfig> None Optional schema customization. See Schema Configuration.

Schema Configuration

All three database backends (Oracle, Postgres, Redis) accept an optional SchemaConfig that lets you customize table names and column names without modifying source code. When schema is omitted, all backends use their default table and column names — zero behavioral change.

postgres:
  db_url: "postgres://user:pass@localhost:5432/mydb"
  pool_max: 16

  schema:
    owner: "myschema"           # Oracle: schema prefix (MYSCHEMA."TABLE")
                                # Redis: key prefix ("myschema:conversation:{id}")
                                # Postgres: ignored (use search_path for schema control)
    version: 2                  # Skip migrations 1–2 (database already at v2)
    auto_migrate: true          # Opt in to automatic migration (default: false)

    conversations:
      table: "my_conversations" # Overrides default "conversations"
      columns:
        id: "conv_id"           # Overrides column name "id" -> "conv_id"
        metadata: "conv_meta"   # Overrides column name "metadata" -> "conv_meta"

    responses:
      table: "my_responses"
      columns:
        safety_identifier: "user_identifier"

    conversation_items:
      table: "my_items"

    conversation_item_links:
      table: "my_links"

SchemaConfig has two types:

Type Fields Purpose
SchemaConfig owner, version, auto_migrate, conversations, responses, conversation_items, conversation_item_links Top-level config with an optional owner/prefix, migration control, and per-table settings
TableConfig table, columns Per-table config: physical table name and a map of logical-to-physical column name overrides

Key behaviors:

  • col(field) returns the physical column name for a logical field name. If no override is configured, the logical name is returned unchanged.
  • qualified_table(owner) returns OWNER."TABLE" when an owner is set (used by Oracle), or just the table name otherwise.
  • Validation runs at startup. All identifiers must match [a-zA-Z0-9_]+. Invalid identifiers are rejected before any queries execute.
  • Redis: Only owner (key prefix) and columns (hash field names) affect Redis behavior. The table field is ignored for Redis key patterns — keys always use hardcoded entity names (conversation, item, response).

Storage Hooks

Hooks let you inject custom logic before and after storage operations without modifying backend code. Use cases include audit logging, multi-tenancy field population, PII redaction, and custom validation.

StorageHook Trait

Implement the StorageHook trait (in hooks.rs):

#[async_trait]
pub trait StorageHook: Send + Sync + 'static {
    async fn before(
        &self,
        operation: StorageOperation,
        context: Option<&RequestContext>,
        payload: &Value,
    ) -> Result<BeforeHookResult, HookError>;

    async fn after(
        &self,
        operation: StorageOperation,
        context: Option<&RequestContext>,
        payload: &Value,
        result: &Value,
        extra: &ExtraColumns,
    ) -> Result<ExtraColumns, HookError>;
}
  • before() returning Continue(extra_columns) proceeds with the operation. The extra columns map is forwarded to the backend for persistence.
  • before() returning Reject(reason) aborts the operation with an error.
  • before() returning Err(_) logs a warning and continues (non-fatal).
  • after() receives the result and extra columns from before(). It can return modified extra columns for the caller.

Wiring a Hook

Pass the hook to create_storage() via StorageFactoryConfig:

let hook = Arc::new(MyHook::new());
let config = StorageFactoryConfig {
    backend: &HistoryBackend::Postgres,
    postgres: Some(pg_config),
    hook: Some(hook),
    ..Default::default()
};
let (responses, conversations, items) = create_storage(config).await?;

Request Context

RequestContext is a per-request key-value bag (populated from HTTP headers, middleware, etc.) made available to hooks via tokio task-local storage:

use data_connector::context::{with_request_context, RequestContext};

let mut ctx = RequestContext::new();
ctx.set("tenant_id", "acme-corp");
ctx.set("user_id", "user-42");

// Run storage operations with context available to hooks
with_request_context(ctx, async {
    conversations.create_conversation(input).await
}).await;

Extra Columns

Extra columns let hooks persist custom fields alongside core data. They are defined in SchemaConfig and populated by hook before() calls.

schema:
  responses:
    extra_columns:
      TENANT_ID:
        sql_type: "VARCHAR(128)"
      STORED_BY:
        sql_type: "VARCHAR(128)"
        default_value: "system"   # Used when hook doesn't provide a value
  conversations:
    extra_columns:
      TENANT_ID:
        sql_type: "VARCHAR(128)"
      CREATED_BY:
        sql_type: "VARCHAR(128)"

Value resolution order for each extra column on write:

  1. Hook-provided value from ExtraColumns (via before() result)
  2. default_value from ColumnDef in schema config
  3. NULL (if neither is available)

Extra columns are write-side enrichment (audit trail, tenant ID, etc.). They are included in DDL for schema creation and in INSERT statements, but are not read back into StoredResponse/Conversation/ConversationItem structs on SELECT.

Skip Columns

Skip columns let you omit core columns from DDL, INSERT, and SELECT statements. This is useful when migrating to a schema that doesn't have all default columns.

schema:
  responses:
    skip_columns:
      - raw_response
      - safety_identifier

Skipped columns use default values when reading (e.g. None for optional fields, empty collections for lists/maps, Value::Null for JSON).

WASM Storage Hooks

For sandboxed, language-agnostic hooks, use the WASM Component Model bridge. The WIT interface is in wasm/src/interface/storage/storage-hooks.wit and the Rust bridge is WasmStorageHook in the smg-wasm crate (behind the storage-hooks feature flag).

use smg_wasm::WasmStorageHook;  // requires: smg-wasm with "storage-hooks" feature

let wasm_bytes = std::fs::read("path/to/storage_hook.component.wasm")?;
let hook = WasmStorageHook::new(&wasm_bytes)?;
// Use Arc::new(hook) in StorageFactoryConfig

See examples/wasm/wasm-guest-storage-hook/ for a complete guest example demonstrating multi-tenancy and audit trail extra columns.

Data Model

Conversations

A Conversation has an id (ConversationId), a created_at timestamp, and optional JSON metadata. Conversations act as containers for ordered sets of conversation items.

Conversation Items

A ConversationItem represents a single turn or event within a conversation. Items are created independently and then linked to a conversation via link_item(). Listing uses cursor-based pagination (ListParams with limit, order, and an optional after cursor).

Fields: id, response_id (optional back-reference), item_type, role, content (JSON), status, created_at.

Responses

A StoredResponse captures a complete model interaction: input, output, instructions, tool calls, metadata, model identifier, and raw response payload. Responses support chaining via previous_response_id and can be queried by safety_identifier for content moderation workflows. The ResponseChain struct reconstructs the chronological sequence of related responses.

ID Generation

ID Type Format Example
ConversationId conv_ + 50 random hex chars (25 bytes) conv_a1b2c3...
ConversationItemId {prefix}_ + 50 random hex chars msg_d4e5f6...
ResponseId ULID (26 chars, lexicographically sortable, millisecond precision) 01ARZ3NDEKTSV4RRFFQ69G5FAV

Item type prefixes:

item_type Prefix
message msg
reasoning rs
mcp_call mcp
mcp_list_tools mcpl
function_call fc
(other) first 3 chars of the type, or itm if empty

Database Schema

All database backends auto-create their schemas on first connection. The following default table names are used (configurable via SchemaConfig):

Default Table Purpose
conversations Conversation records with metadata
conversation_items Individual items (messages, tool calls, etc.)
conversation_item_links Join table linking items to conversations with ordering (added_at)
responses Stored model responses with chaining and safety identifier indexing

PostgreSQL additionally creates an index on conversation_item_links(conversation_id, added_at) for efficient cursor-based listing.

Column names within each table can also be overridden via SchemaConfig. If you rename a column in config, the corresponding database column must already exist with that name.

Schema Versioning

The data connector includes a built-in migration system that tracks applied schema changes in a _schema_versions table. Each backend (Oracle, Postgres) defines its own migration list with backend-specific DDL. Redis has no structural schema and does not use migrations.

Safe by default: auto_migrate defaults to false. When pending migrations are detected, startup fails with the exact SQL statements needed so you can review and apply them manually. Set auto_migrate: true to opt in to automatic migration.

On startup:

  1. Tables are created if they don't exist (CREATE TABLE / CREATE TABLE IF NOT EXISTS)
  2. The _schema_versions tracking table is created
  3. Pending migrations are checked:
    • If auto_migrate: true → migrations are applied automatically
    • If auto_migrate: false (default) → startup fails with actionable SQL if migrations are pending

Current migrations:

Version Description
1 Add safety_identifier column to responses
2 Remove legacy user_id column from responses

Controlling migrations

Config field Type Default Description
auto_migrate bool false Set to true to apply migrations automatically on startup. When false, startup fails with the exact SQL if migrations are pending.
version u32 (optional) None Starting version — migrations up to this number are skipped. Use when your database is already at a known version.

Example: opt in to automatic migration:

oracle:
  schema:
    auto_migrate: true

Example: database already at version 2, skip those migrations:

oracle:
  schema:
    version: 2
    auto_migrate: true

Concurrency safety

  • Postgres: Uses pg_advisory_lock to serialize migrations across concurrent application instances.
  • Oracle: DDL statements use PL/SQL exception handling for idempotency. Duplicate version records from concurrent instances are detected and skipped (ORA-00001).

Testing

Run the unit tests (Memory and NoOp backends, config validation, ID generation):

cargo test -p data-connector

Integration tests against live PostgreSQL, Redis, or Oracle instances require the corresponding backend to be running and configured.