pub const OLTP_SCHEMA: &str = r#"
CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
platform TEXT NOT NULL,
name TEXT NOT NULL,
organization_id TEXT NOT NULL,
idempotency_key TEXT NOT NULL,
metadata_json JSONB,
analytics_json JSONB,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);
-- Enforces conversation create idempotency on (org, idempotencyKey).
CREATE UNIQUE INDEX IF NOT EXISTS uq_conversations_org_idem
ON conversations (organization_id, idempotency_key);
CREATE INDEX IF NOT EXISTS idx_conversations_org_created
ON conversations (organization_id, created_at DESC);
CREATE TABLE IF NOT EXISTS conversation_participants (
id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
organization_id TEXT NOT NULL,
type TEXT NOT NULL CHECK (type IN ('user', 'ai-agent', 'human-agent')),
external_id TEXT,
internal_id TEXT,
browser_fingerprint TEXT,
browser_info JSONB,
name TEXT NOT NULL,
email TEXT,
phone TEXT,
crm_contact_id TEXT,
metadata_json JSONB,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_participants_conversation
ON conversation_participants (conversation_id, created_at);
-- Resolve a returning user by external identity within a conversation.
CREATE INDEX IF NOT EXISTS idx_participants_external
ON conversation_participants (conversation_id, external_id);
CREATE TABLE IF NOT EXISTS conversation_messages (
id TEXT PRIMARY KEY,
external_id TEXT,
organization_id TEXT,
conversation_id TEXT,
direction TEXT NOT NULL CHECK (direction IN ('inbound', 'outbound')),
content JSONB NOT NULL,
from_ref JSONB,
to_ref JSONB,
metadata_json JSONB,
analytics_json JSONB,
-- Monotonic append sequence per conversation; the stable paging cursor.
seq BIGSERIAL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_messages_conversation_seq
ON conversation_messages (conversation_id, seq);
CREATE TABLE IF NOT EXISTS conversation_sessions (
session_id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
agent_name TEXT NOT NULL,
user_participant_id TEXT NOT NULL,
agent_participant_id TEXT NOT NULL,
thread_id TEXT NOT NULL,
status TEXT,
token_count BIGINT,
message_count BIGINT,
metadata JSONB,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
ended_at TIMESTAMPTZ,
last_activity_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_sessions_conversation
ON conversation_sessions (conversation_id, created_at);
"#;
pub const ADMIN_SCHEMA: &str = r#"
CREATE TABLE IF NOT EXISTS connector_configs (
org_id TEXT NOT NULL,
id TEXT NOT NULL,
name TEXT NOT NULL,
kind TEXT NOT NULL,
config JSONB NOT NULL,
enabled BOOLEAN NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (org_id, id)
);
CREATE TABLE IF NOT EXISTS agent_settings (
org_id TEXT PRIMARY KEY,
model TEXT NOT NULL,
system_prompt TEXT NOT NULL,
default_tools JSONB NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE IF NOT EXISTS indexing_runs (
id TEXT PRIMARY KEY,
connector_name TEXT NOT NULL,
status TEXT NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
finished_at TIMESTAMPTZ,
documents_seen BIGINT NOT NULL,
chunks_indexed BIGINT NOT NULL,
documents_skipped BIGINT NOT NULL,
cursor TIMESTAMPTZ,
error TEXT
);
-- list_runs(name) orders by started_at; latest_cursor scans the succeeded rows.
CREATE INDEX IF NOT EXISTS idx_indexing_runs_connector_started
ON indexing_runs (connector_name, started_at DESC);
"#;
pub const VECTOR_EXTENSION: &str = "CREATE EXTENSION IF NOT EXISTS vector;";
#[must_use]
pub fn knowledge_vectors_schema(dim: usize) -> String {
format!(
r#"
CREATE TABLE IF NOT EXISTS knowledge_vectors (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
organization_id TEXT,
source TEXT NOT NULL,
content TEXT NOT NULL,
embedding vector({dim}) NOT NULL,
content_tsv tsvector GENERATED ALWAYS AS (to_tsvector('english', content)) STORED,
metadata JSONB,
-- Document-level access control (feature gap G3), persisted so the ACL
-- survives the ingest(process)→serve(process) boundary (the in-memory ACL
-- side table cannot). The serialized DocAcl (`{{public, users[], groups[]}}`)
-- the document carried at ingest; NULL ⇒ no ACL recorded ⇒ org-public
-- (backward-compatible default). The chat retrieval path filters rows by the
-- requester's entitlements against this column (see knowledge.rs query_async).
acl JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Idempotent add for tables created before the ACL column existed (so an
-- upgrade-in-place gets the column without a destructive migration).
ALTER TABLE knowledge_vectors ADD COLUMN IF NOT EXISTS acl JSONB;
-- Dense ANN: HNSW over cosine distance (the `<=>` operator).
CREATE INDEX IF NOT EXISTS idx_knowledge_embedding_hnsw
ON knowledge_vectors USING hnsw (embedding vector_cosine_ops);
-- Sparse BM25-style keyword retrieval over the generated tsvector.
CREATE INDEX IF NOT EXISTS idx_knowledge_content_tsv
ON knowledge_vectors USING gin (content_tsv);
CREATE INDEX IF NOT EXISTS idx_knowledge_org
ON knowledge_vectors (organization_id);
"#
)
}
#[must_use]
pub fn memories_schema(dim: usize) -> String {
format!(
r#"
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
organization_id TEXT NOT NULL,
-- NULL ⇒ org-wide memory (not bound to a single user).
user_id TEXT,
content TEXT NOT NULL,
memory_type TEXT NOT NULL,
relevance REAL NOT NULL DEFAULT 0,
metadata JSONB NOT NULL DEFAULT '{{}}'::jsonb,
embedding vector({dim}) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_accessed TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Dense ANN: HNSW over cosine distance (the `<=>` operator), as knowledge_vectors.
CREATE INDEX IF NOT EXISTS idx_memories_embedding_hnsw
ON memories USING hnsw (embedding vector_cosine_ops);
-- Namespace scan: recall always filters on (org_id, user_id) before ANN ranking.
CREATE INDEX IF NOT EXISTS idx_memories_namespace
ON memories (organization_id, user_id);
"#
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{GatewayEmbedder, OPENAI_SMALL_EMBEDDING_DIM};
use smooth_operator::embedding::{DeterministicEmbedder, Embedder, DEFAULT_EMBEDDING_DIM};
#[test]
fn store_column_width_matches_active_embedder_dim() {
let det = DeterministicEmbedder::new();
assert_eq!(det.dim(), DEFAULT_EMBEDDING_DIM);
let det_ddl = knowledge_vectors_schema(det.dim());
assert!(
det_ddl.contains("vector(1024)"),
"deterministic embedder ({}-d) must yield a vector(1024) column, got:\n{det_ddl}",
det.dim()
);
assert!(!det_ddl.contains("vector(1536)"));
let gw = GatewayEmbedder::new(
"https://example.test/v1",
"sk-test",
"text-embedding-3-small",
OPENAI_SMALL_EMBEDDING_DIM,
);
assert_eq!(gw.dim(), OPENAI_SMALL_EMBEDDING_DIM);
let gw_ddl = knowledge_vectors_schema(gw.dim());
assert!(
gw_ddl.contains("vector(1536)"),
"gateway embedder ({}-d) must yield a vector(1536) column, got:\n{gw_ddl}",
gw.dim()
);
assert!(!gw_ddl.contains("vector(1024)"));
}
#[test]
fn memories_column_width_matches_active_embedder_dim() {
let det = DeterministicEmbedder::new();
assert_eq!(det.dim(), DEFAULT_EMBEDDING_DIM);
let det_ddl = memories_schema(det.dim());
assert!(
det_ddl.contains("vector(1024)"),
"deterministic embedder ({}-d) must yield a vector(1024) column, got:\n{det_ddl}",
det.dim()
);
assert!(!det_ddl.contains("vector(1536)"));
assert!(det_ddl.contains("user_id TEXT,"));
assert!(det_ddl.contains("idx_memories_namespace"));
let gw = GatewayEmbedder::new(
"https://example.test/v1",
"sk-test",
"text-embedding-3-small",
OPENAI_SMALL_EMBEDDING_DIM,
);
let gw_ddl = memories_schema(gw.dim());
assert!(
gw_ddl.contains("vector(1536)"),
"gateway embedder ({}-d) must yield a vector(1536) column, got:\n{gw_ddl}",
gw.dim()
);
assert!(!gw_ddl.contains("vector(1024)"));
}
}