Skip to main content

do_memory_storage_turso/
turso_config.rs

1//! Turso storage configuration and schema initialization.
2//!
3//! This module provides schema initialization methods to keep lib.rs under 500 LOC.
4
5use crate::{Result, TursoStorage, schema};
6use tracing::{debug, info};
7
8impl TursoStorage {
9    /// Initialize the database schema
10    ///
11    /// Creates tables and indexes if they don't exist.
12    /// Safe to call multiple times.
13    pub async fn initialize_schema(&self) -> Result<()> {
14        info!("Initializing Turso database schema");
15        let conn = self.get_connection().await?;
16
17        // Enable WAL mode for better concurrent access
18        let _ = self.execute_pragmas(&conn).await;
19
20        // Create tables
21        self.execute_with_retry(&conn, schema::CREATE_EPISODES_TABLE)
22            .await?;
23        self.ensure_episodes_checkpoints_column(&conn).await?;
24        self.execute_with_retry(&conn, schema::CREATE_PATTERNS_TABLE)
25            .await?;
26        self.execute_with_retry(&conn, schema::CREATE_HEURISTICS_TABLE)
27            .await?;
28        self.execute_with_retry(&conn, schema::CREATE_RECOMMENDATION_SESSIONS_TABLE)
29            .await?;
30        self.execute_with_retry(&conn, schema::CREATE_RECOMMENDATION_FEEDBACK_TABLE)
31            .await?;
32
33        // Create legacy embeddings table only when multi-dimension feature is NOT enabled
34        #[cfg(not(feature = "turso_multi_dimension"))]
35        self.execute_with_retry(&conn, schema::CREATE_EMBEDDINGS_TABLE)
36            .await?;
37
38        // Create monitoring tables
39        self.execute_with_retry(&conn, schema::CREATE_EXECUTION_RECORDS_TABLE)
40            .await?;
41        self.execute_with_retry(&conn, schema::CREATE_AGENT_METRICS_TABLE)
42            .await?;
43        self.execute_with_retry(&conn, schema::CREATE_TASK_METRICS_TABLE)
44            .await?;
45
46        // Create indexes
47        self.execute_with_retry(&conn, schema::CREATE_EPISODES_TASK_TYPE_INDEX)
48            .await?;
49        self.execute_with_retry(&conn, schema::CREATE_EPISODES_TIMESTAMP_INDEX)
50            .await?;
51        self.execute_with_retry(&conn, schema::CREATE_EPISODES_DOMAIN_INDEX)
52            .await?;
53        self.execute_with_retry(&conn, schema::CREATE_EPISODES_ARCHIVED_INDEX)
54            .await?;
55        self.execute_with_retry(&conn, schema::CREATE_PATTERNS_CONTEXT_INDEX)
56            .await?;
57        self.execute_with_retry(&conn, schema::CREATE_HEURISTICS_CONFIDENCE_INDEX)
58            .await?;
59        self.execute_with_retry(&conn, schema::CREATE_RECOMMENDATION_SESSIONS_EPISODE_INDEX)
60            .await?;
61
62        // Create legacy embeddings indexes
63        #[cfg(not(feature = "turso_multi_dimension"))]
64        {
65            self.execute_with_retry(&conn, schema::CREATE_EMBEDDINGS_ITEM_INDEX)
66                .await?;
67            self.execute_with_retry(&conn, schema::CREATE_EMBEDDINGS_VECTOR_INDEX)
68                .await?;
69        }
70
71        // Create monitoring indexes
72        self.execute_with_retry(&conn, schema::CREATE_EXECUTION_RECORDS_TIME_INDEX)
73            .await?;
74        self.execute_with_retry(&conn, schema::CREATE_EXECUTION_RECORDS_AGENT_INDEX)
75            .await?;
76        self.execute_with_retry(&conn, schema::CREATE_AGENT_METRICS_TYPE_INDEX)
77            .await?;
78
79        // Create Phase 2 (GENESIS) tables and indexes
80        self.execute_with_retry(&conn, schema::CREATE_EPISODE_SUMMARIES_TABLE)
81            .await?;
82        self.execute_with_retry(&conn, schema::CREATE_SUMMARIES_CREATED_AT_INDEX)
83            .await?;
84        self.execute_with_retry(&conn, schema::CREATE_METADATA_TABLE)
85            .await?;
86
87        // Create Episode Tags tables and indexes
88        self.execute_with_retry(&conn, schema::CREATE_EPISODE_TAGS_TABLE)
89            .await?;
90        self.execute_with_retry(&conn, schema::CREATE_EPISODE_TAGS_TAG_INDEX)
91            .await?;
92        self.execute_with_retry(&conn, schema::CREATE_EPISODE_TAGS_EPISODE_INDEX)
93            .await?;
94        self.execute_with_retry(&conn, schema::CREATE_TAG_METADATA_TABLE)
95            .await?;
96
97        // Create Episode Relationships table and indexes
98        self.execute_with_retry(&conn, schema::CREATE_EPISODE_RELATIONSHIPS_TABLE)
99            .await?;
100        self.execute_with_retry(&conn, schema::CREATE_RELATIONSHIPS_FROM_INDEX)
101            .await?;
102        self.execute_with_retry(&conn, schema::CREATE_RELATIONSHIPS_TO_INDEX)
103            .await?;
104        self.execute_with_retry(&conn, schema::CREATE_RELATIONSHIPS_TYPE_INDEX)
105            .await?;
106        self.execute_with_retry(&conn, schema::CREATE_RELATIONSHIPS_BIDIRECTIONAL_INDEX)
107            .await?;
108
109        // Create FTS5 tables for hybrid search (feature-gated)
110        #[cfg(feature = "hybrid_search")]
111        self.initialize_fts5_schema(&conn).await?;
112
113        // Create dimension-specific vector tables (Phase 0)
114        #[cfg(feature = "turso_multi_dimension")]
115        self.initialize_vector_tables(&conn).await?;
116
117        info!("Schema initialization complete");
118        Ok(())
119    }
120
121    /// Initialize FTS5 schema for hybrid search
122    #[cfg(feature = "hybrid_search")]
123    async fn initialize_fts5_schema(&self, conn: &libsql::Connection) -> Result<()> {
124        use crate::fts5_schema;
125        info!("Initializing FTS5 schema for hybrid search");
126        self.execute_with_retry(conn, fts5_schema::CREATE_EPISODES_FTS_TABLE)
127            .await?;
128        self.execute_with_retry(conn, fts5_schema::CREATE_PATTERNS_FTS_TABLE)
129            .await?;
130        self.execute_with_retry(conn, fts5_schema::CREATE_EPISODES_FTS_TRIGGERS)
131            .await?;
132        self.execute_with_retry(conn, fts5_schema::CREATE_PATTERNS_FTS_TRIGGERS)
133            .await?;
134        info!("FTS5 schema initialization complete");
135        Ok(())
136    }
137
138    #[cfg(not(feature = "hybrid_search"))]
139    #[allow(dead_code)] // Feature-gated stub: empty implementation when hybrid_search disabled
140    async fn initialize_fts5_schema(&self, _conn: &libsql::Connection) -> Result<()> {
141        Ok(())
142    }
143
144    /// Initialize dimension-specific vector tables
145    #[cfg(feature = "turso_multi_dimension")]
146    async fn initialize_vector_tables(&self, conn: &libsql::Connection) -> Result<()> {
147        info!("Initializing dimension-specific vector tables");
148
149        // Create tables
150        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_384_TABLE)
151            .await?;
152        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1024_TABLE)
153            .await?;
154        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1536_TABLE)
155            .await?;
156        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_3072_TABLE)
157            .await?;
158        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_OTHER_TABLE)
159            .await?;
160
161        // Create vector indexes
162        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_384_VECTOR_INDEX)
163            .await?;
164        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1024_VECTOR_INDEX)
165            .await?;
166        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1536_VECTOR_INDEX)
167            .await?;
168        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_3072_VECTOR_INDEX)
169            .await?;
170
171        // Create item indexes
172        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_384_ITEM_INDEX)
173            .await?;
174        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1024_ITEM_INDEX)
175            .await?;
176        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1536_ITEM_INDEX)
177            .await?;
178        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_3072_ITEM_INDEX)
179            .await?;
180        self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_OTHER_ITEM_INDEX)
181            .await?;
182
183        info!("Dimension-specific vector tables initialized");
184        Ok(())
185    }
186
187    #[cfg(not(feature = "turso_multi_dimension"))]
188    #[allow(dead_code)] // Feature-gated stub: empty implementation when turso_multi_dimension disabled
189    async fn initialize_vector_tables(&self, _conn: &libsql::Connection) -> Result<()> {
190        Ok(())
191    }
192
193    /// Ensure the episodes.checkpoints column exists for backward compatibility.
194    async fn ensure_episodes_checkpoints_column(&self, conn: &libsql::Connection) -> Result<()> {
195        let mut rows = conn
196            .query("PRAGMA table_info(episodes)", ())
197            .await
198            .map_err(|e| {
199                do_memory_core::Error::Storage(format!("Failed to inspect episodes schema: {}", e))
200            })?;
201
202        let mut has_checkpoints = false;
203        while let Some(row) = rows.next().await.map_err(|e| {
204            do_memory_core::Error::Storage(format!("Failed to read episodes schema row: {}", e))
205        })? {
206            let column_name: String = row.get(1).map_err(|e| {
207                do_memory_core::Error::Storage(format!(
208                    "Failed to parse episodes schema column name: {}",
209                    e
210                ))
211            })?;
212
213            if column_name == "checkpoints" {
214                has_checkpoints = true;
215                break;
216            }
217        }
218
219        if !has_checkpoints {
220            debug!("Adding missing episodes.checkpoints column");
221            self.execute_with_retry(conn, schema::ADD_EPISODES_CHECKPOINTS_COLUMN)
222                .await?;
223        }
224
225        Ok(())
226    }
227}