1use super::methods::ConeIdentifier;
2use super::types::{ConeConfig, ConeError, ConeHandle, ConeId, ConeInfo, Message, MessageId, MessageRole, Position};
3use crate::activations::arbor::{ArborStorage, NodeId, TreeId};
4use crate::activations::storage::init_sqlite_pool;
5use crate::activation_db_path_from_module;
6use serde_json::Value;
7use sqlx::{sqlite::SqlitePool, Row};
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11use uuid::Uuid;
12
13#[derive(Debug, Clone)]
15pub struct ConeStorageConfig {
16 pub db_path: PathBuf,
18}
19
20impl Default for ConeStorageConfig {
21 fn default() -> Self {
22 Self {
23 db_path: activation_db_path_from_module!("cones.db"),
24 }
25 }
26}
27
28pub struct ConeStorage {
30 pool: SqlitePool,
31 arbor: Arc<ArborStorage>,
32}
33
34impl ConeStorage {
35 pub async fn new(config: ConeStorageConfig, arbor: Arc<ArborStorage>) -> Result<Self, ConeError> {
37 let pool = init_sqlite_pool(config.db_path).await?;
38
39 let storage = Self { pool, arbor };
40 storage.run_migrations().await?;
41
42 Ok(storage)
43 }
44
45 async fn run_migrations(&self) -> Result<(), ConeError> {
47 sqlx::query(
48 r#"
49 CREATE TABLE IF NOT EXISTS cones (
50 id TEXT PRIMARY KEY,
51 name TEXT NOT NULL UNIQUE,
52 model_id TEXT NOT NULL,
53 system_prompt TEXT,
54 tree_id TEXT NOT NULL,
55 canonical_head TEXT NOT NULL,
56 metadata TEXT,
57 created_at INTEGER NOT NULL,
58 updated_at INTEGER NOT NULL
59 );
60
61 CREATE TABLE IF NOT EXISTS messages (
62 id TEXT PRIMARY KEY,
63 cone_id TEXT NOT NULL,
64 role TEXT NOT NULL,
65 content TEXT NOT NULL,
66 model_id TEXT,
67 input_tokens INTEGER,
68 output_tokens INTEGER,
69 created_at INTEGER NOT NULL,
70 FOREIGN KEY (cone_id) REFERENCES cones(id) ON DELETE CASCADE
71 );
72
73 CREATE INDEX IF NOT EXISTS idx_cones_name ON cones(name);
74 CREATE INDEX IF NOT EXISTS idx_cones_tree ON cones(tree_id);
75 CREATE INDEX IF NOT EXISTS idx_messages_cone ON messages(cone_id);
76 "#,
77 )
78 .execute(&self.pool)
79 .await
80 .map_err(|e| ConeError::StorageError { operation: "migration".into(), detail: e.to_string() })?;
81
82 Ok(())
83 }
84
85 pub fn arbor(&self) -> &ArborStorage {
87 &self.arbor
88 }
89
90 pub async fn cone_create(
99 &self,
100 name: String,
101 model_id: String,
102 system_prompt: Option<String>,
103 metadata: Option<Value>,
104 ) -> Result<ConeConfig, ConeError> {
105 let cone_id = ConeId::new_v4();
106 let now = current_timestamp();
107
108 let tree_id = self.arbor.tree_create(metadata.clone(), &cone_id.to_string()).await
110 .map_err(|e| ConeError::ArborError { detail: format!("Failed to create tree: {}", e) })?;
111
112 let tree = self.arbor.tree_get(&tree_id).await
114 .map_err(|e| ConeError::ArborError { detail: format!("Failed to get tree: {}", e) })?;
115 let head = Position::new(tree_id, tree.root);
116
117 let metadata_json = metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
118
119 let final_name = match sqlx::query(
121 "INSERT INTO cones (id, name, model_id, system_prompt, tree_id, canonical_head, metadata, created_at, updated_at)
122 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
123 )
124 .bind(cone_id.to_string())
125 .bind(&name)
126 .bind(&model_id)
127 .bind(&system_prompt)
128 .bind(head.tree_id.to_string())
129 .bind(head.node_id.to_string())
130 .bind(metadata_json.clone())
131 .bind(now)
132 .bind(now)
133 .execute(&self.pool)
134 .await {
135 Ok(_) => name, Err(e) if e.to_string().contains("UNIQUE constraint failed") => {
137 let unique_name = format!("{}#{}", name, cone_id);
139
140 sqlx::query(
141 "INSERT INTO cones (id, name, model_id, system_prompt, tree_id, canonical_head, metadata, created_at, updated_at)
142 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
143 )
144 .bind(cone_id.to_string())
145 .bind(&unique_name)
146 .bind(&model_id)
147 .bind(&system_prompt)
148 .bind(head.tree_id.to_string())
149 .bind(head.node_id.to_string())
150 .bind(metadata_json)
151 .bind(now)
152 .bind(now)
153 .execute(&self.pool)
154 .await
155 .map_err(|e| ConeError::StorageError { operation: "create_cone".into(), detail: e.to_string() })?;
156
157 unique_name
158 }
159 Err(e) => return Err(ConeError::StorageError { operation: "create_cone".into(), detail: e.to_string() }),
160 };
161
162 Ok(ConeConfig {
163 id: cone_id,
164 name: final_name,
165 model_id,
166 system_prompt,
167 head,
168 metadata,
169 created_at: now,
170 updated_at: now,
171 })
172 }
173
174 pub async fn resolve_cone_identifier(&self, identifier: &ConeIdentifier) -> Result<ConeId, ConeError> {
182 match identifier {
183 ConeIdentifier::ById { id } => Ok(*id),
184 ConeIdentifier::ByName { name } => {
185 if let Some(row) = sqlx::query("SELECT id FROM cones WHERE name = ?")
187 .bind(name)
188 .fetch_optional(&self.pool)
189 .await
190 .map_err(|e| ConeError::StorageError { operation: "resolve_by_name".into(), detail: e.to_string() })?
191 {
192 let id_str: String = row.get("id");
193 return Uuid::parse_str(&id_str)
194 .map_err(|e| ConeError::StorageError { operation: "parse_cone_id".into(), detail: e.to_string() });
195 }
196
197 let pattern = format!("{}%", name);
200 let rows = sqlx::query("SELECT id, name FROM cones WHERE name LIKE ?")
201 .bind(&pattern)
202 .fetch_all(&self.pool)
203 .await
204 .map_err(|e| ConeError::StorageError { operation: "resolve_by_pattern".into(), detail: e.to_string() })?;
205
206 match rows.len() {
207 0 => Err(ConeError::SessionNotFound { name: name.clone() }),
208 1 => {
209 let id_str: String = rows[0].get("id");
210 Uuid::parse_str(&id_str)
211 .map_err(|e| ConeError::StorageError { operation: "parse_cone_id".into(), detail: e.to_string() })
212 }
213 _ => {
214 let matches: Vec<String> = rows.iter().map(|r| r.get("name")).collect();
216 Err(ConeError::InvalidState {
217 message: format!(
218 "Ambiguous name '{}' matches multiple cones: {}. Use full name with #uuid to disambiguate.",
219 name,
220 matches.join(", ")
221 ),
222 })
223 }
224 }
225 }
226 }
227 }
228
229 pub async fn cone_get(&self, cone_id: &ConeId) -> Result<ConeConfig, ConeError> {
231 let row = sqlx::query(
232 "SELECT id, name, model_id, system_prompt, tree_id, canonical_head, metadata, created_at, updated_at
233 FROM cones WHERE id = ?",
234 )
235 .bind(cone_id.to_string())
236 .fetch_optional(&self.pool)
237 .await
238 .map_err(|e| ConeError::StorageError { operation: "fetch_cone".into(), detail: e.to_string() })?
239 .ok_or_else(|| ConeError::SessionNotFound { name: cone_id.to_string() })?;
240
241 self.row_to_cone_config(row)
242 }
243
244 pub async fn cone_get_by_identifier(&self, identifier: &ConeIdentifier) -> Result<ConeConfig, ConeError> {
246 let cone_id = self.resolve_cone_identifier(identifier).await?;
247 self.cone_get(&cone_id).await
248 }
249
250 pub async fn cone_list(&self) -> Result<Vec<ConeInfo>, ConeError> {
252 let rows = sqlx::query(
253 "SELECT id, name, model_id, tree_id, canonical_head, created_at FROM cones ORDER BY created_at DESC",
254 )
255 .fetch_all(&self.pool)
256 .await
257 .map_err(|e| ConeError::StorageError { operation: "list_cones".into(), detail: e.to_string() })?;
258
259 let cones: Result<Vec<ConeInfo>, ConeError> = rows
260 .iter()
261 .map(|row| {
262 let id_str: String = row.get("id");
263 let tree_id_str: String = row.get("tree_id");
264 let head_str: String = row.get("canonical_head");
265
266 let tree_id = TreeId::parse_str(&tree_id_str).map_err(|e| ConeError::StorageError { operation: "parse_tree_id".into(), detail: e.to_string() })?;
267 let node_id = NodeId::parse_str(&head_str).map_err(|e| ConeError::StorageError { operation: "parse_node_id".into(), detail: e.to_string() })?;
268
269 Ok(ConeInfo {
270 id: Uuid::parse_str(&id_str).map_err(|e| ConeError::StorageError { operation: "parse_cone_id".into(), detail: e.to_string() })?,
271 name: row.get("name"),
272 model_id: row.get("model_id"),
273 head: Position::new(tree_id, node_id),
274 created_at: row.get("created_at"),
275 })
276 })
277 .collect();
278
279 cones
280 }
281
282 pub async fn cone_update_head(
284 &self,
285 cone_id: &ConeId,
286 new_head: NodeId,
287 ) -> Result<(), ConeError> {
288 let now = current_timestamp();
289
290 let result = sqlx::query(
291 "UPDATE cones SET canonical_head = ?, updated_at = ? WHERE id = ?",
292 )
293 .bind(new_head.to_string())
294 .bind(now)
295 .bind(cone_id.to_string())
296 .execute(&self.pool)
297 .await
298 .map_err(|e| ConeError::StorageError { operation: "update_head".into(), detail: e.to_string() })?;
299
300 if result.rows_affected() == 0 {
301 return Err(ConeError::SessionNotFound { name: cone_id.to_string() });
302 }
303
304 Ok(())
305 }
306
307 pub async fn cone_update(
309 &self,
310 cone_id: &ConeId,
311 name: Option<String>,
312 model_id: Option<String>,
313 system_prompt: Option<Option<String>>,
314 metadata: Option<Value>,
315 ) -> Result<(), ConeError> {
316 let now = current_timestamp();
317
318 let current = self.cone_get(cone_id).await?;
320
321 let new_name = name.unwrap_or(current.name);
322 let new_model = model_id.unwrap_or(current.model_id);
323 let new_prompt = system_prompt.unwrap_or(current.system_prompt);
324 let new_metadata = metadata.or(current.metadata);
325 let metadata_json = new_metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
326
327 sqlx::query(
328 "UPDATE cones SET name = ?, model_id = ?, system_prompt = ?, metadata = ?, updated_at = ? WHERE id = ?",
329 )
330 .bind(&new_name)
331 .bind(&new_model)
332 .bind(&new_prompt)
333 .bind(metadata_json)
334 .bind(now)
335 .bind(cone_id.to_string())
336 .execute(&self.pool)
337 .await
338 .map_err(|e| ConeError::StorageError { operation: "update_cone".into(), detail: e.to_string() })?;
339
340 Ok(())
341 }
342
343 pub async fn cone_delete(&self, cone_id: &ConeId) -> Result<(), ConeError> {
345 let result = sqlx::query("DELETE FROM cones WHERE id = ?")
346 .bind(cone_id.to_string())
347 .execute(&self.pool)
348 .await
349 .map_err(|e| ConeError::StorageError { operation: "delete_cone".into(), detail: e.to_string() })?;
350
351 if result.rows_affected() == 0 {
352 return Err(ConeError::SessionNotFound { name: cone_id.to_string() });
353 }
354
355 Ok(())
356 }
357
358 pub async fn message_create(
364 &self,
365 cone_id: &ConeId,
366 role: MessageRole,
367 content: String,
368 model_id: Option<String>,
369 input_tokens: Option<i64>,
370 output_tokens: Option<i64>,
371 ) -> Result<Message, ConeError> {
372 let message_id = MessageId::new_v4();
373 let now = current_timestamp();
374
375 sqlx::query(
376 "INSERT INTO messages (id, cone_id, role, content, model_id, input_tokens, output_tokens, created_at)
377 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
378 )
379 .bind(message_id.to_string())
380 .bind(cone_id.to_string())
381 .bind(role.as_str())
382 .bind(&content)
383 .bind(&model_id)
384 .bind(input_tokens)
385 .bind(output_tokens)
386 .bind(now)
387 .execute(&self.pool)
388 .await
389 .map_err(|e| ConeError::StorageError { operation: "create_message".into(), detail: e.to_string() })?;
390
391 Ok(Message {
392 id: message_id,
393 cone_id: *cone_id,
394 role,
395 content,
396 created_at: now,
397 model_id,
398 input_tokens,
399 output_tokens,
400 })
401 }
402
403 pub async fn message_create_ephemeral(
405 &self,
406 cone_id: &ConeId,
407 role: MessageRole,
408 content: String,
409 model_id: Option<String>,
410 input_tokens: Option<i64>,
411 output_tokens: Option<i64>,
412 ) -> Result<Message, ConeError> {
413 let message_id = MessageId::new_v4();
414 let now = current_timestamp();
415
416 let ephemeral_marker = -now;
418
419 sqlx::query(
420 "INSERT INTO messages (id, cone_id, role, content, model_id, input_tokens, output_tokens, created_at)
421 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
422 )
423 .bind(message_id.to_string())
424 .bind(cone_id.to_string())
425 .bind(role.as_str())
426 .bind(&content)
427 .bind(&model_id)
428 .bind(input_tokens)
429 .bind(output_tokens)
430 .bind(ephemeral_marker)
431 .execute(&self.pool)
432 .await
433 .map_err(|e| ConeError::StorageError { operation: "create_ephemeral_message".into(), detail: e.to_string() })?;
434
435 Ok(Message {
436 id: message_id,
437 cone_id: *cone_id,
438 role,
439 content,
440 created_at: ephemeral_marker,
441 model_id,
442 input_tokens,
443 output_tokens,
444 })
445 }
446
447 pub async fn message_get(&self, message_id: &MessageId) -> Result<Message, ConeError> {
449 let row = sqlx::query(
450 "SELECT id, cone_id, role, content, model_id, input_tokens, output_tokens, created_at
451 FROM messages WHERE id = ?",
452 )
453 .bind(message_id.to_string())
454 .fetch_optional(&self.pool)
455 .await
456 .map_err(|e| ConeError::StorageError { operation: "fetch_message".into(), detail: e.to_string() })?
457 .ok_or_else(|| ConeError::SessionNotFound { name: format!("message:{}", message_id) })?;
458
459 self.row_to_message(row)
460 }
461
462 pub async fn resolve_message_handle(&self, identifier: &str) -> Result<Message, ConeError> {
465 let parts: Vec<&str> = identifier.splitn(3, ':').collect();
467 if parts.len() < 2 {
468 return Err(format!("Invalid message handle format: {}", identifier).into());
469 }
470
471 let msg_part = parts[0];
472 if !msg_part.starts_with("msg-") {
473 return Err(format!("Invalid message handle format: {}", identifier).into());
474 }
475
476 let message_id_str = &msg_part[4..]; let message_id = Uuid::parse_str(message_id_str)
478 .map_err(|e| ConeError::StorageError { operation: "parse_handle_id".into(), detail: e.to_string() })?;
479
480 self.message_get(&message_id).await
481 }
482
483 pub fn message_to_handle(message: &Message, name: &str) -> crate::types::Handle {
488 ConeHandle::Message {
489 message_id: format!("msg-{}", message.id),
490 role: message.role.as_str().to_string(),
491 name: name.to_string(),
492 }.to_handle()
493 }
494
495 fn row_to_message(&self, row: sqlx::sqlite::SqliteRow) -> Result<Message, ConeError> {
500 let id_str: String = row.get("id");
501 let cone_id_str: String = row.get("cone_id");
502 let role_str: String = row.get("role");
503
504 Ok(Message {
505 id: Uuid::parse_str(&id_str).map_err(|e| ConeError::StorageError { operation: "parse_message_id".into(), detail: e.to_string() })?,
506 cone_id: Uuid::parse_str(&cone_id_str).map_err(|e| ConeError::StorageError { operation: "parse_cone_id".into(), detail: e.to_string() })?,
507 role: MessageRole::from_str(&role_str).ok_or_else(|| ConeError::StorageError { operation: "parse_role".into(), detail: format!("Invalid role: {}", role_str) })?,
508 content: row.get("content"),
509 created_at: row.get("created_at"),
510 model_id: row.get("model_id"),
511 input_tokens: row.get("input_tokens"),
512 output_tokens: row.get("output_tokens"),
513 })
514 }
515
516 fn row_to_cone_config(&self, row: sqlx::sqlite::SqliteRow) -> Result<ConeConfig, ConeError> {
517 let id_str: String = row.get("id");
518 let tree_id_str: String = row.get("tree_id");
519 let head_str: String = row.get("canonical_head");
520 let metadata_json: Option<String> = row.get("metadata");
521
522 let tree_id = TreeId::parse_str(&tree_id_str).map_err(|e| ConeError::StorageError { operation: "parse_tree_id".into(), detail: e.to_string() })?;
523 let node_id = NodeId::parse_str(&head_str).map_err(|e| ConeError::StorageError { operation: "parse_node_id".into(), detail: e.to_string() })?;
524
525 Ok(ConeConfig {
526 id: Uuid::parse_str(&id_str).map_err(|e| ConeError::StorageError { operation: "parse_cone_id".into(), detail: e.to_string() })?,
527 name: row.get("name"),
528 model_id: row.get("model_id"),
529 system_prompt: row.get("system_prompt"),
530 head: Position::new(tree_id, node_id),
531 metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
532 created_at: row.get("created_at"),
533 updated_at: row.get("updated_at"),
534 })
535 }
536}
537
538fn current_timestamp() -> i64 {
540 SystemTime::now()
541 .duration_since(UNIX_EPOCH)
542 .unwrap()
543 .as_secs() as i64
544}
545
546#[cfg(test)]
547mod tests {
548 use super::*;
549 use super::super::Cone;
550
551 #[test]
560 fn invariant_handle_meta_format_matches_resolver() {
561 let message = Message {
563 id: Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(),
564 cone_id: Uuid::new_v4(),
565 role: MessageRole::User,
566 content: "test content".to_string(),
567 created_at: 0,
568 model_id: None,
569 input_tokens: None,
570 output_tokens: None,
571 };
572
573 let handle = ConeStorage::message_to_handle(&message, "test-cone");
575
576 let identifier = handle.meta.join(":");
578
579 let parts: Vec<&str> = identifier.splitn(3, ':').collect();
582
583 assert_eq!(parts.len(), 3, "identifier should have 3 parts: {}", identifier);
584 assert!(parts[0].starts_with("msg-"), "first part should start with 'msg-': {}", parts[0]);
585
586 let msg_part = parts[0];
588 let message_id_str = &msg_part[4..]; let parsed_id = Uuid::parse_str(message_id_str);
590 assert!(parsed_id.is_ok(), "should be able to parse UUID from meta[0]");
591 assert_eq!(parsed_id.unwrap(), message.id);
592
593 assert_eq!(parts[1], "user");
595
596 assert_eq!(parts[2], "test-cone");
598 }
599
600 #[test]
601 fn invariant_handle_meta_roles() {
602 for (role, expected_str) in [
604 (MessageRole::User, "user"),
605 (MessageRole::Assistant, "assistant"),
606 (MessageRole::System, "system"),
607 ] {
608 let message = Message {
609 id: Uuid::new_v4(),
610 cone_id: Uuid::new_v4(),
611 role,
612 content: "test".to_string(),
613 created_at: 0,
614 model_id: None,
615 input_tokens: None,
616 output_tokens: None,
617 };
618
619 let handle = ConeStorage::message_to_handle(&message, "cone");
620 assert_eq!(handle.meta[1], expected_str);
621 }
622 }
623
624 #[test]
625 fn invariant_handle_plugin_method_fixed() {
626 let message = Message {
628 id: Uuid::new_v4(),
629 cone_id: Uuid::new_v4(),
630 role: MessageRole::User,
631 content: "test".to_string(),
632 created_at: 0,
633 model_id: None,
634 input_tokens: None,
635 output_tokens: None,
636 };
637
638 let handle = ConeStorage::message_to_handle(&message, "any-name");
639
640 assert_eq!(handle.plugin_id, Cone::PLUGIN_ID);
642 assert_eq!(handle.version, "1.0.0");
643 assert_eq!(handle.method, "chat");
644 }
645
646 #[test]
647 fn invariant_handle_meta_has_three_parts() {
648 let message = Message {
650 id: Uuid::new_v4(),
651 cone_id: Uuid::new_v4(),
652 role: MessageRole::Assistant,
653 content: "response".to_string(),
654 created_at: 0,
655 model_id: Some("gpt-4".to_string()),
656 input_tokens: Some(10),
657 output_tokens: Some(20),
658 };
659
660 let handle = ConeStorage::message_to_handle(&message, "my-cone");
661
662 assert_eq!(handle.meta.len(), 3, "cone chat handle must have exactly 3 meta parts");
663 }
664}