data-connector
Version: 2.0.0 | License: Apache-2.0
Pluggable storage abstraction for the Shepherd Model Gateway (SMG). Provides trait-based backends for persisting conversations, conversation items, and responses. Backend selection is a runtime decision driven by configuration, allowing the same application binary to target in-memory storage during development and a production database in deployment.
Architecture
Core Traits
The crate defines three async traits in core.rs. Every backend implements all
three:
| Trait | Responsibility |
|---|---|
ConversationStorage |
CRUD for conversation records (create, get, update, delete) |
ConversationItemStorage |
Item creation, linking items to conversations, cursor-based listing, deletion of links |
ResponseStorage |
Store/retrieve responses, walk response chains, index and query by safety identifier |
All traits are Send + Sync + 'static and use async_trait so they can be
held behind Arc<dyn Trait>.
Factory Pattern
create_storage() is the single entry point. It accepts a
StorageFactoryConfig (backend selector plus optional per-backend configs) and
returns a StorageTuple:
pub type StorageTuple = ;
Supported Backends
| Backend | Variant | Description |
|---|---|---|
| Memory | HistoryBackend::Memory |
In-process HashMap/BTreeMap storage. Default. No persistence across restarts. Suitable for development and testing. |
| None / NoOp | HistoryBackend::None |
Accepts all writes silently, returns empty on reads. Use when persistence is intentionally disabled. |
| PostgreSQL | HistoryBackend::Postgres |
Production backend using tokio-postgres with deadpool connection pooling. Fully async. Tables are auto-created on first connection. |
| Redis | HistoryBackend::Redis |
Production backend using deadpool-redis. Supports optional TTL-based data retention (retention_days). |
| Oracle ATP | HistoryBackend::Oracle |
Enterprise backend using the synchronous oracle crate. Async bridging is handled via tokio::task::spawn_blocking. Tables are auto-created on initialization. |
Usage
use ;
use json;
use Arc;
// Build factory config -- memory backend needs no extra configuration.
let config = StorageFactoryConfig ;
let = create_storage.await.unwrap;
// Create a conversation
let conv = conversations
.create_conversation
.await
.unwrap;
// Store a response
let mut resp = new;
resp.input = json!;
let resp_id = responses.store_response.await.unwrap;
// Create and link a conversation item
let item = items
.create_item
.await
.unwrap;
items.link_item.await.unwrap;
Configuration
Backend selection is controlled by the HistoryBackend enum
("memory", "none", "oracle", "postgres", "redis" when deserialized
from JSON/YAML). Each database backend has a dedicated config struct.
PostgresConfig
| Field | Type | Description |
|---|---|---|
db_url |
String |
Connection URL (postgres://user:pass@host:port/dbname). Validated for scheme, host, and database name. |
pool_max |
usize |
Maximum connections in the deadpool pool (default helper: 16). Must be > 0. |
schema |
Option<SchemaConfig> |
Optional schema customization. See Schema Configuration. |
Call validate() to check the URL before use.
RedisConfig
| Field | Type | Default | Description |
|---|---|---|---|
url |
String |
-- | Connection URL (redis:// or rediss://). |
pool_max |
usize |
16 | Maximum pool connections. |
retention_days |
Option<u64> |
Some(30) |
TTL in days for stored data. None disables expiration. |
schema |
Option<SchemaConfig> |
None |
Optional schema customization. See Schema Configuration. |
Call validate() to check the URL before use.
OracleConfig
| Field | Type | Default | Description |
|---|---|---|---|
wallet_path |
Option<String> |
None |
Path to ATP wallet / TLS config directory. |
connect_descriptor |
String |
-- | DSN, e.g. tcps://host:port/service. |
external_auth |
bool |
false |
Use OS/external authentication. |
username |
String |
-- | Database username. |
password |
String |
-- | Database password (redacted in Debug output). |
pool_min |
usize |
1 | Minimum pool connections. |
pool_max |
usize |
16 | Maximum pool connections. |
pool_timeout_secs |
u64 |
30 | Connection acquisition timeout in seconds. |
schema |
Option<SchemaConfig> |
None |
Optional schema customization. See Schema Configuration. |
Schema Configuration
All three database backends (Oracle, Postgres, Redis) accept an optional
SchemaConfig that lets you customize table names and column names without
modifying source code. When schema is omitted, all backends use their
default table and column names — zero behavioral change.
postgres:
db_url: "postgres://user:pass@localhost:5432/mydb"
pool_max: 16
schema:
owner: "myschema" # Oracle: schema prefix (MYSCHEMA."TABLE")
# Redis: key prefix ("myschema:conversation:{id}")
# Postgres: ignored (use search_path for schema control)
version: 2 # Skip migrations 1–2 (database already at v2)
auto_migrate: true # Opt in to automatic migration (default: false)
conversations:
table: "my_conversations" # Overrides default "conversations"
columns:
id: "conv_id" # Overrides column name "id" -> "conv_id"
metadata: "conv_meta" # Overrides column name "metadata" -> "conv_meta"
responses:
table: "my_responses"
columns:
safety_identifier: "user_identifier"
conversation_items:
table: "my_items"
conversation_item_links:
table: "my_links"
SchemaConfig has two types:
| Type | Fields | Purpose |
|---|---|---|
SchemaConfig |
owner, version, auto_migrate, conversations, responses, conversation_items, conversation_item_links |
Top-level config with an optional owner/prefix, migration control, and per-table settings |
TableConfig |
table, columns |
Per-table config: physical table name and a map of logical-to-physical column name overrides |
Key behaviors:
col(field)returns the physical column name for a logical field name. If no override is configured, the logical name is returned unchanged.qualified_table(owner)returnsOWNER."TABLE"when an owner is set (used by Oracle), or just the table name otherwise.- Validation runs at startup. All identifiers must match
[a-zA-Z0-9_]+. Invalid identifiers are rejected before any queries execute. - Redis: Only
owner(key prefix) andcolumns(hash field names) affect Redis behavior. Thetablefield is ignored for Redis key patterns — keys always use hardcoded entity names (conversation,item,response).
Storage Hooks
Hooks let you inject custom logic before and after storage operations without modifying backend code. Use cases include audit logging, multi-tenancy field population, PII redaction, and custom validation.
StorageHook Trait
Implement the StorageHook trait (in hooks.rs):
before()returningContinue(extra_columns)proceeds with the operation. The extra columns map is forwarded to the backend for persistence.before()returningReject(reason)aborts the operation with an error.before()returningErr(_)logs a warning and continues (non-fatal).after()receives the result and extra columns frombefore(). It can return modified extra columns for the caller.
Wiring a Hook
Pass the hook to create_storage() via StorageFactoryConfig:
let hook = new;
let config = StorageFactoryConfig ;
let = create_storage.await?;
Request Context
RequestContext is a per-request key-value bag (populated from HTTP headers,
middleware, etc.) made available to hooks via tokio task-local storage:
use ;
let mut ctx = new;
ctx.set;
ctx.set;
// Run storage operations with context available to hooks
with_request_context.await;
Extra Columns
Extra columns let hooks persist custom fields alongside core data. They are
defined in SchemaConfig and populated by hook before() calls.
schema:
responses:
extra_columns:
TENANT_ID:
sql_type: "VARCHAR(128)"
STORED_BY:
sql_type: "VARCHAR(128)"
default_value: "system" # Used when hook doesn't provide a value
conversations:
extra_columns:
TENANT_ID:
sql_type: "VARCHAR(128)"
CREATED_BY:
sql_type: "VARCHAR(128)"
Value resolution order for each extra column on write:
- Hook-provided value from
ExtraColumns(viabefore()result) default_valuefromColumnDefin schema configNULL(if neither is available)
Extra columns are write-side enrichment (audit trail, tenant ID, etc.). They
are included in DDL for schema creation and in INSERT statements, but are
not read back into StoredResponse/Conversation/ConversationItem
structs on SELECT.
Skip Columns
Skip columns let you omit core columns from DDL, INSERT, and SELECT statements. This is useful when migrating to a schema that doesn't have all default columns.
schema:
responses:
skip_columns:
- raw_response
- safety_identifier
Skipped columns use default values when reading (e.g. None for optional
fields, empty collections for lists/maps, Value::Null for JSON).
WASM Storage Hooks
For sandboxed, language-agnostic hooks, use the WASM Component Model bridge.
The WIT interface is in wasm/src/interface/storage/storage-hooks.wit and
the Rust bridge is WasmStorageHook in the smg-wasm crate (behind the
storage-hooks feature flag).
use WasmStorageHook; // requires: smg-wasm with "storage-hooks" feature
let wasm_bytes = read?;
let hook = new?;
// Use Arc::new(hook) in StorageFactoryConfig
See examples/wasm/wasm-guest-storage-hook/ for a complete guest example
demonstrating multi-tenancy and audit trail extra columns.
Data Model
Conversations
A Conversation has an id (ConversationId), a created_at timestamp, and
optional JSON metadata. Conversations act as containers for ordered sets of
conversation items.
Conversation Items
A ConversationItem represents a single turn or event within a conversation.
Items are created independently and then linked to a conversation via
link_item(). Listing uses cursor-based pagination (ListParams with limit,
order, and an optional after cursor).
Fields: id, response_id (optional back-reference), item_type, role,
content (JSON), status, created_at.
Responses
A StoredResponse captures a complete model interaction: input, output,
instructions, tool calls, metadata, model identifier, and raw response payload.
Responses support chaining via previous_response_id and can be queried by
safety_identifier for content moderation workflows. The ResponseChain
struct reconstructs the chronological sequence of related responses.
ID Generation
| ID Type | Format | Example |
|---|---|---|
ConversationId |
conv_ + 50 random hex chars (25 bytes) |
conv_a1b2c3... |
ConversationItemId |
{prefix}_ + 50 random hex chars |
msg_d4e5f6... |
ResponseId |
ULID (26 chars, lexicographically sortable, millisecond precision) | 01ARZ3NDEKTSV4RRFFQ69G5FAV |
Item type prefixes:
item_type |
Prefix |
|---|---|
message |
msg |
reasoning |
rs |
mcp_call |
mcp |
mcp_list_tools |
mcpl |
function_call |
fc |
| (other) | first 3 chars of the type, or itm if empty |
Database Schema
All database backends auto-create their schemas on first connection. The
following default table names are used (configurable via SchemaConfig):
| Default Table | Purpose |
|---|---|
conversations |
Conversation records with metadata |
conversation_items |
Individual items (messages, tool calls, etc.) |
conversation_item_links |
Join table linking items to conversations with ordering (added_at) |
responses |
Stored model responses with chaining and safety identifier indexing |
PostgreSQL additionally creates an index on
conversation_item_links(conversation_id, added_at) for efficient
cursor-based listing.
Column names within each table can also be overridden via SchemaConfig.
If you rename a column in config, the corresponding database column must
already exist with that name.
Schema Versioning
The data connector includes a built-in migration system that tracks applied
schema changes in a _schema_versions table. Each backend (Oracle, Postgres)
defines its own migration list with backend-specific DDL. Redis has no
structural schema and does not use migrations.
Safe by default: auto_migrate defaults to false. When pending
migrations are detected, startup fails with the exact SQL statements
needed so you can review and apply them manually. Set auto_migrate: true
to opt in to automatic migration.
On startup:
- Tables are created if they don't exist (
CREATE TABLE/CREATE TABLE IF NOT EXISTS) - The
_schema_versionstracking table is created - Pending migrations are checked:
- If
auto_migrate: true→ migrations are applied automatically - If
auto_migrate: false(default) → startup fails with actionable SQL if migrations are pending
- If
Current migrations:
| Version | Description |
|---|---|
| 1 | Add safety_identifier column to responses |
| 2 | Remove legacy user_id column from responses |
Controlling migrations
| Config field | Type | Default | Description |
|---|---|---|---|
auto_migrate |
bool |
false |
Set to true to apply migrations automatically on startup. When false, startup fails with the exact SQL if migrations are pending. |
version |
u32 (optional) |
None |
Starting version — migrations up to this number are skipped. Use when your database is already at a known version. |
Example: opt in to automatic migration:
oracle:
schema:
auto_migrate: true
Example: database already at version 2, skip those migrations:
oracle:
schema:
version: 2
auto_migrate: true
Concurrency safety
- Postgres: Uses
pg_advisory_lockto serialize migrations across concurrent application instances. - Oracle: DDL statements use PL/SQL exception handling for idempotency. Duplicate version records from concurrent instances are detected and skipped (ORA-00001).
Testing
Run the unit tests (Memory and NoOp backends, config validation, ID generation):
Integration tests against live PostgreSQL, Redis, or Oracle instances require the corresponding backend to be running and configured.