1use super::types::{
2 ArborError, ArborId, Node, NodeId, NodeType, ResourceRefs, ResourceState, Tree, TreeId, Handle,
3};
4use crate::activations::storage::init_sqlite_pool;
5use crate::activation_db_path_from_module;
6use serde_json::Value;
7use sqlx::{sqlite::SqlitePool, Row};
8use uuid::Uuid;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13#[derive(Debug, Clone)]
15pub struct ArborConfig {
16 pub scheduled_deletion_window: i64, pub archive_window: i64, pub db_path: PathBuf,
24
25 pub auto_cleanup: bool,
27
28 pub cleanup_interval: i64, }
31
32impl Default for ArborConfig {
33 fn default() -> Self {
34 Self {
35 scheduled_deletion_window: 604800, archive_window: 2592000, db_path: activation_db_path_from_module!("arbor.db"),
38 auto_cleanup: true,
39 cleanup_interval: 3600, }
41 }
42}
43
44pub struct ArborStorage {
69 pool: SqlitePool,
70 #[allow(dead_code)]
71 config: ArborConfig,
72}
73
74impl ArborStorage {
75 pub async fn new(config: ArborConfig) -> Result<Self, ArborError> {
77 let pool = init_sqlite_pool(config.db_path.clone()).await?;
78
79 let storage = Self { pool, config };
80 storage.run_migrations().await?;
81
82 Ok(storage)
83 }
84
85 async fn run_migrations(&self) -> Result<(), ArborError> {
87 sqlx::query(
88 r#"
89 CREATE TABLE IF NOT EXISTS trees (
90 id TEXT PRIMARY KEY,
91 root_node_id TEXT NOT NULL,
92 ref_count INTEGER NOT NULL DEFAULT 1,
93 state TEXT NOT NULL DEFAULT 'active',
94 scheduled_deletion_at INTEGER,
95 archived_at INTEGER,
96 created_at INTEGER NOT NULL,
97 updated_at INTEGER NOT NULL,
98 metadata TEXT
99 );
100
101 CREATE TABLE IF NOT EXISTS tree_refs (
102 tree_id TEXT NOT NULL,
103 owner_id TEXT NOT NULL,
104 count INTEGER NOT NULL DEFAULT 1,
105 claimed_at INTEGER NOT NULL,
106 PRIMARY KEY (tree_id, owner_id),
107 FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
108 );
109
110 CREATE TABLE IF NOT EXISTS nodes (
111 id TEXT PRIMARY KEY,
112 tree_id TEXT NOT NULL,
113 parent_id TEXT,
114 ref_count INTEGER NOT NULL DEFAULT 1,
115 state TEXT NOT NULL DEFAULT 'active',
116 scheduled_deletion_at INTEGER,
117 archived_at INTEGER,
118 node_type TEXT NOT NULL,
119 content TEXT,
120 handle_plugin_id TEXT,
121 handle_version TEXT,
122 handle_method TEXT,
123 handle_meta TEXT,
124 created_at INTEGER NOT NULL,
125 metadata TEXT,
126 FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
127 );
128
129 CREATE TABLE IF NOT EXISTS node_refs (
130 node_id TEXT NOT NULL,
131 owner_id TEXT NOT NULL,
132 count INTEGER NOT NULL DEFAULT 1,
133 claimed_at INTEGER NOT NULL,
134 PRIMARY KEY (node_id, owner_id),
135 FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
136 );
137
138 CREATE TABLE IF NOT EXISTS node_children (
139 parent_id TEXT NOT NULL,
140 child_id TEXT NOT NULL,
141 position INTEGER NOT NULL,
142 PRIMARY KEY (parent_id, child_id),
143 FOREIGN KEY (parent_id) REFERENCES nodes(id) ON DELETE CASCADE,
144 FOREIGN KEY (child_id) REFERENCES nodes(id) ON DELETE CASCADE
145 );
146
147 CREATE INDEX IF NOT EXISTS idx_trees_state ON trees(state);
148 CREATE INDEX IF NOT EXISTS idx_trees_scheduled ON trees(scheduled_deletion_at) WHERE state = 'scheduled_delete';
149 CREATE INDEX IF NOT EXISTS idx_trees_archived ON trees(archived_at) WHERE state = 'archived';
150 CREATE INDEX IF NOT EXISTS idx_nodes_tree ON nodes(tree_id);
151 CREATE INDEX IF NOT EXISTS idx_nodes_parent ON nodes(parent_id);
152 CREATE INDEX IF NOT EXISTS idx_nodes_state ON nodes(state);
153 CREATE INDEX IF NOT EXISTS idx_nodes_scheduled ON nodes(scheduled_deletion_at) WHERE state = 'scheduled_delete';
154 CREATE INDEX IF NOT EXISTS idx_node_children_parent ON node_children(parent_id);
155 CREATE INDEX IF NOT EXISTS idx_node_children_child ON node_children(child_id);
156 "#,
157 )
158 .execute(&self.pool)
159 .await
160 .map_err(|e| ArborError::InitError { detail: format!("Failed to run migrations: {}", e) })?;
161
162 Ok(())
163 }
164
165 pub async fn tree_create(
171 &self,
172 metadata: Option<serde_json::Value>,
173 owner_id: &str,
174 ) -> Result<TreeId, ArborError> {
175 self.tree_create_with_id(None, metadata, owner_id).await
176 }
177
178 pub async fn tree_create_with_id(
180 &self,
181 tree_id: Option<TreeId>,
182 metadata: Option<serde_json::Value>,
183 owner_id: &str,
184 ) -> Result<TreeId, ArborError> {
185 let tree_id = tree_id.unwrap_or_else(TreeId::new);
186 let root_id = NodeId::new();
187 let now = current_timestamp();
188
189 let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
190
191 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
193 sqlx::query(
194 "INSERT INTO trees (id, root_node_id, ref_count, state, created_at, updated_at, metadata)
195 VALUES (?, ?, 1, 'active', ?, ?, ?)",
196 )
197 .bind(tree_id.to_string())
198 .bind(root_id.to_string())
199 .bind(now)
200 .bind(now)
201 .bind(metadata_json)
202 .execute(&mut *tx)
203 .await
204 .map_err(|e| format!("Failed to create tree: {}", e))?;
205
206 sqlx::query(
208 "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at) VALUES (?, ?, 1, ?)",
209 )
210 .bind(tree_id.to_string())
211 .bind(owner_id)
212 .bind(now)
213 .execute(&mut *tx)
214 .await
215 .map_err(|e| format!("Failed to create tree ref: {}", e))?;
216
217 sqlx::query(
219 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, created_at)
220 VALUES (?, ?, NULL, 1, 'active', 'text', '', ?)",
221 )
222 .bind(root_id.to_string())
223 .bind(tree_id.to_string())
224 .bind(now)
225 .execute(&mut *tx)
226 .await
227 .map_err(|e| format!("Failed to create root node: {}", e))?;
228
229 tx.commit().await.map_err(|e| e.to_string())?;
230
231 Ok(tree_id)
232 }
233
234 pub async fn tree_get(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
236 self.tree_get_internal(tree_id, false).await
237 }
238
239 pub async fn tree_get_archived(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
241 self.tree_get_internal(tree_id, true).await
242 }
243
244 async fn tree_get_internal(
246 &self,
247 tree_id: &TreeId,
248 allow_archived: bool,
249 ) -> Result<Tree, ArborError> {
250 let tree_row = sqlx::query(
252 "SELECT id, root_node_id, ref_count, state, scheduled_deletion_at, archived_at, created_at, updated_at, metadata
253 FROM trees WHERE id = ?",
254 )
255 .bind(tree_id.to_string())
256 .fetch_optional(&self.pool)
257 .await
258 .map_err(|e| ArborError::StorageError { operation: "fetch_tree".to_string(), detail: e.to_string() })?
259 .ok_or_else(|| ArborError::TreeNotFound { tree_id: tree_id.to_string() })?;
260
261 let state_str: String = tree_row.get("state");
262 let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
263
264 if !allow_archived && state == ResourceState::Archived {
265 return Err(ArborError::InvalidState { message: format!("Tree {} is archived, use tree_get_archived()", tree_id) });
266 }
267
268 let root_node_id: String = tree_row.get("root_node_id");
269 let root_node_id = ArborId::parse_str(&root_node_id)
270 .map_err(|e| format!("Invalid root node ID: {}", e))?;
271
272 let nodes = self.get_nodes_for_tree(tree_id).await?;
274
275 let refs = self.get_tree_refs(tree_id).await?;
277
278 let metadata_json: Option<String> = tree_row.get("metadata");
279 let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
280
281 Ok(Tree {
282 id: *tree_id,
283 root: root_node_id,
284 nodes,
285 state: Some(state),
286 refs: Some(refs),
287 scheduled_deletion_at: tree_row.get("scheduled_deletion_at"),
288 archived_at: tree_row.get("archived_at"),
289 created_at: tree_row.get("created_at"),
290 updated_at: tree_row.get("updated_at"),
291 metadata,
292 })
293 }
294
295 async fn get_nodes_for_tree(&self, tree_id: &TreeId) -> Result<HashMap<NodeId, Node>, ArborError> {
297 let rows = sqlx::query(
298 "SELECT id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, archived_at,
299 node_type, content, handle_plugin_id, handle_version, handle_method,
300 handle_meta, created_at, metadata
301 FROM nodes WHERE tree_id = ?",
302 )
303 .bind(tree_id.to_string())
304 .fetch_all(&self.pool)
305 .await
306 .map_err(|e| format!("Failed to fetch nodes: {}", e))?;
307
308 let mut nodes = HashMap::new();
309
310 for row in rows {
311 let node_id_str: String = row.get("id");
312 let node_id = ArborId::parse_str(&node_id_str)
313 .map_err(|e| format!("Invalid node ID: {}", e))?;
314
315 let parent_id_str: Option<String> = row.get("parent_id");
316 let parent_id = parent_id_str
317 .map(|s| ArborId::parse_str(&s).map_err(|e| format!("Invalid parent ID: {}", e)))
318 .transpose()?;
319
320 let children = self.get_node_children_internal(&node_id).await?;
322
323 let node_type_str: String = row.get("node_type");
324 let data = match node_type_str.as_str() {
325 "text" => {
326 let content: String = row.get("content");
327 NodeType::Text { content }
328 }
329 "external" => {
330 let plugin_id_str: String = row.get("handle_plugin_id");
331 let plugin_id = Uuid::parse_str(&plugin_id_str)
332 .map_err(|e| format!("Invalid handle plugin_id: {}", e))?;
333 let version: String = row.get("handle_version");
334 let method: String = row.get("handle_method");
335 let meta_json: Option<String> = row.get("handle_meta");
336 let meta: Vec<String> = meta_json
337 .and_then(|s| serde_json::from_str(&s).ok())
338 .unwrap_or_default();
339
340 NodeType::External {
341 handle: Handle::new(plugin_id, version, method)
342 .with_meta(meta),
343 }
344 }
345 _ => return Err(format!("Unknown node type: {}", node_type_str).into()),
346 };
347
348 let state_str: String = row.get("state");
349 let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
350
351 let refs = self.get_node_refs(&node_id).await?;
352
353 let metadata_json: Option<String> = row.get("metadata");
354 let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
355
356 let node = Node {
357 id: node_id,
358 parent: parent_id,
359 children,
360 data,
361 state: Some(state),
362 refs: Some(refs),
363 scheduled_deletion_at: row.get("scheduled_deletion_at"),
364 archived_at: row.get("archived_at"),
365 created_at: row.get("created_at"),
366 metadata,
367 };
368
369 nodes.insert(node_id, node);
370 }
371
372 Ok(nodes)
373 }
374
375 async fn get_node_children_internal(&self, node_id: &NodeId) -> Result<Vec<NodeId>, ArborError> {
377 let rows = sqlx::query(
378 "SELECT child_id FROM node_children WHERE parent_id = ? ORDER BY position",
379 )
380 .bind(node_id.to_string())
381 .fetch_all(&self.pool)
382 .await
383 .map_err(|e| format!("Failed to fetch node children: {}", e))?;
384
385 let children: Result<Vec<NodeId>, ArborError> = rows
386 .iter()
387 .map(|row| {
388 let child_id_str: String = row.get("child_id");
389 ArborId::parse_str(&child_id_str)
390 .map_err(|e| e.into())
391 })
392 .collect();
393
394 children
395 }
396
397 async fn add_child_to_parent(&self, parent_id: &NodeId, child_id: &NodeId) -> Result<(), ArborError> {
399 let row = sqlx::query(
401 "SELECT COALESCE(MAX(position), -1) + 1 as next_pos FROM node_children WHERE parent_id = ?",
402 )
403 .bind(parent_id.to_string())
404 .fetch_one(&self.pool)
405 .await
406 .map_err(|e| format!("Failed to get next position: {}", e))?;
407
408 let next_pos: i64 = row.get("next_pos");
409
410 sqlx::query(
411 "INSERT INTO node_children (parent_id, child_id, position) VALUES (?, ?, ?)",
412 )
413 .bind(parent_id.to_string())
414 .bind(child_id.to_string())
415 .bind(next_pos)
416 .execute(&self.pool)
417 .await
418 .map_err(|e| format!("Failed to add child to parent: {}", e))?;
419
420 Ok(())
421 }
422
423 async fn get_tree_refs(&self, tree_id: &TreeId) -> Result<ResourceRefs, ArborError> {
425 let rows = sqlx::query(
426 "SELECT owner_id, count FROM tree_refs WHERE tree_id = ?",
427 )
428 .bind(tree_id.to_string())
429 .fetch_all(&self.pool)
430 .await
431 .map_err(|e| format!("Failed to fetch tree refs: {}", e))?;
432
433 let mut owners = HashMap::new();
434 let mut total = 0i64;
435
436 for row in rows {
437 let owner_id: String = row.get("owner_id");
438 let count: i64 = row.get("count");
439 owners.insert(owner_id, count);
440 total += count;
441 }
442
443 Ok(ResourceRefs {
444 ref_count: total,
445 owners,
446 })
447 }
448
449 async fn get_node_refs(&self, node_id: &NodeId) -> Result<ResourceRefs, ArborError> {
451 let rows = sqlx::query(
452 "SELECT owner_id, count FROM node_refs WHERE node_id = ?",
453 )
454 .bind(node_id.to_string())
455 .fetch_all(&self.pool)
456 .await
457 .map_err(|e| format!("Failed to fetch node refs: {}", e))?;
458
459 let mut owners = HashMap::new();
460 let mut total = 0i64;
461
462 for row in rows {
463 let owner_id: String = row.get("owner_id");
464 let count: i64 = row.get("count");
465 owners.insert(owner_id, count);
466 total += count;
467 }
468
469 Ok(ResourceRefs {
470 ref_count: total,
471 owners,
472 })
473 }
474
475 pub async fn tree_list(&self, include_scheduled: bool) -> Result<Vec<TreeId>, ArborError> {
477 let query = if include_scheduled {
478 "SELECT id FROM trees WHERE state IN ('active', 'scheduled_delete')"
479 } else {
480 "SELECT id FROM trees WHERE state = 'active'"
481 };
482
483 let rows = sqlx::query(query)
484 .fetch_all(&self.pool)
485 .await
486 .map_err(|e| format!("Failed to list trees: {}", e))?;
487
488 let tree_ids: Result<Vec<TreeId>, ArborError> = rows
489 .iter()
490 .map(|row| {
491 let id_str: String = row.get("id");
492 ArborId::parse_str(&id_str)
493 .map_err(|e| format!("Invalid tree ID: {}", e).into())
494 })
495 .collect();
496
497 tree_ids
498 }
499
500 pub async fn tree_list_scheduled(&self) -> Result<Vec<TreeId>, ArborError> {
502 let rows = sqlx::query(
503 "SELECT id FROM trees WHERE state = 'scheduled_delete'",
504 )
505 .fetch_all(&self.pool)
506 .await
507 .map_err(|e| format!("Failed to list scheduled trees: {}", e))?;
508
509 let tree_ids: Result<Vec<TreeId>, ArborError> = rows
510 .iter()
511 .map(|row| {
512 let id_str: String = row.get("id");
513 ArborId::parse_str(&id_str)
514 .map_err(|e| format!("Invalid tree ID: {}", e).into())
515 })
516 .collect();
517
518 tree_ids
519 }
520
521 pub async fn tree_list_archived(&self) -> Result<Vec<TreeId>, ArborError> {
523 let rows = sqlx::query(
524 "SELECT id FROM trees WHERE state = 'archived'",
525 )
526 .fetch_all(&self.pool)
527 .await
528 .map_err(|e| format!("Failed to list archived trees: {}", e))?;
529
530 let tree_ids: Result<Vec<TreeId>, ArborError> = rows
531 .iter()
532 .map(|row| {
533 let id_str: String = row.get("id");
534 ArborId::parse_str(&id_str)
535 .map_err(|e| format!("Invalid tree ID: {}", e).into())
536 })
537 .collect();
538
539 tree_ids
540 }
541
542 pub async fn tree_update_metadata(
544 &self,
545 tree_id: &TreeId,
546 metadata: Value,
547 ) -> Result<(), ArborError> {
548 let now = current_timestamp();
549 let metadata_json = serde_json::to_string(&metadata)
550 .map_err(|e| format!("Failed to serialize metadata: {}", e))?;
551
552 sqlx::query(
553 "UPDATE trees SET metadata = ?, updated_at = ? WHERE id = ? AND state = 'active'",
554 )
555 .bind(metadata_json)
556 .bind(now)
557 .bind(tree_id.to_string())
558 .execute(&self.pool)
559 .await
560 .map_err(|e| format!("Failed to update tree metadata: {}", e))?;
561
562 Ok(())
563 }
564
565 pub async fn tree_claim(
567 &self,
568 tree_id: &TreeId,
569 owner_id: &str,
570 count: i64,
571 ) -> Result<i64, ArborError> {
572 let now = current_timestamp();
573 let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
574
575 let tree_row = sqlx::query(
577 "SELECT state, scheduled_deletion_at FROM trees WHERE id = ?",
578 )
579 .bind(tree_id.to_string())
580 .fetch_optional(&mut *tx)
581 .await
582 .map_err(|e| ArborError::StorageError { operation: "fetch_tree".to_string(), detail: e.to_string() })?
583 .ok_or_else(|| ArborError::TreeNotFound { tree_id: tree_id.to_string() })?;
584
585 let state_str: String = tree_row.get("state");
586 let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
587
588 if state == ResourceState::Archived {
589 return Err(ArborError::InvalidState { message: format!("Cannot claim archived tree {}", tree_id) });
590 }
591
592 if state == ResourceState::ScheduledDelete {
594 sqlx::query(
595 "UPDATE trees SET state = 'active', scheduled_deletion_at = NULL, updated_at = ? WHERE id = ?",
596 )
597 .bind(now)
598 .bind(tree_id.to_string())
599 .execute(&mut *tx)
600 .await
601 .map_err(|e| format!("Failed to reactivate tree: {}", e))?;
602 }
603
604 sqlx::query(
606 "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at)
607 VALUES (?, ?, ?, ?)
608 ON CONFLICT(tree_id, owner_id) DO UPDATE SET
609 count = count + excluded.count,
610 claimed_at = excluded.claimed_at",
611 )
612 .bind(tree_id.to_string())
613 .bind(owner_id)
614 .bind(count)
615 .bind(now)
616 .execute(&mut *tx)
617 .await
618 .map_err(|e| format!("Failed to claim tree: {}", e))?;
619
620 sqlx::query(
622 "UPDATE trees SET ref_count = ref_count + ?, updated_at = ? WHERE id = ?",
623 )
624 .bind(count)
625 .bind(now)
626 .bind(tree_id.to_string())
627 .execute(&mut *tx)
628 .await
629 .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
630
631 let new_count_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
633 .bind(tree_id.to_string())
634 .fetch_one(&mut *tx)
635 .await
636 .map_err(|e| format!("Failed to fetch new ref_count: {}", e))?;
637
638 let new_count: i64 = new_count_row.get("ref_count");
639
640 tx.commit().await.map_err(|e| e.to_string())?;
641 Ok(new_count)
642 }
643
644 pub async fn tree_release(
646 &self,
647 tree_id: &TreeId,
648 owner_id: &str,
649 count: i64,
650 ) -> Result<i64, ArborError> {
651 let now = current_timestamp();
652 let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
653
654 let owner_ref = sqlx::query(
656 "SELECT count FROM tree_refs WHERE tree_id = ? AND owner_id = ?",
657 )
658 .bind(tree_id.to_string())
659 .bind(owner_id)
660 .fetch_optional(&mut *tx)
661 .await
662 .map_err(|e| format!("Failed to fetch tree ref: {}", e))?
663 .ok_or_else(|| format!("No reference found for owner {}", owner_id))?;
664
665 let current_count: i64 = owner_ref.get("count");
666 if current_count < count {
667 return Err(format!(
668 "Cannot release {} references, owner only has {}",
669 count, current_count
670 )
671 .into());
672 }
673
674 let new_count = current_count - count;
675
676 if new_count == 0 {
678 sqlx::query("DELETE FROM tree_refs WHERE tree_id = ? AND owner_id = ?")
679 .bind(tree_id.to_string())
680 .bind(owner_id)
681 .execute(&mut *tx)
682 .await
683 .map_err(|e| format!("Failed to delete tree ref: {}", e))?;
684 } else {
685 sqlx::query(
686 "UPDATE tree_refs SET count = ?, claimed_at = ? WHERE tree_id = ? AND owner_id = ?",
687 )
688 .bind(new_count)
689 .bind(now)
690 .bind(tree_id.to_string())
691 .bind(owner_id)
692 .execute(&mut *tx)
693 .await
694 .map_err(|e| format!("Failed to update tree ref: {}", e))?;
695 }
696
697 sqlx::query(
699 "UPDATE trees SET ref_count = ref_count - ?, updated_at = ? WHERE id = ?",
700 )
701 .bind(count)
702 .bind(now)
703 .bind(tree_id.to_string())
704 .execute(&mut *tx)
705 .await
706 .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
707
708 let tree_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
710 .bind(tree_id.to_string())
711 .fetch_one(&mut *tx)
712 .await
713 .map_err(|e| format!("Failed to fetch tree: {}", e))?;
714
715 let ref_count: i64 = tree_row.get("ref_count");
716 if ref_count == 0 {
717 sqlx::query(
718 "UPDATE trees SET state = 'scheduled_delete', scheduled_deletion_at = ?, updated_at = ? WHERE id = ?",
719 )
720 .bind(now)
721 .bind(now)
722 .bind(tree_id.to_string())
723 .execute(&mut *tx)
724 .await
725 .map_err(|e| format!("Failed to schedule tree deletion: {}", e))?;
726 }
727
728 tx.commit().await.map_err(|e| e.to_string())?;
729 Ok(ref_count)
730 }
731
732 pub async fn node_create_text(
734 &self,
735 tree_id: &TreeId,
736 parent: Option<NodeId>,
737 content: String,
738 metadata: Option<Value>,
739 ) -> Result<NodeId, ArborError> {
740 let node_id = NodeId::new();
741 let now = current_timestamp();
742
743 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
744
745 sqlx::query(
746 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, metadata, created_at)
747 VALUES (?, ?, ?, 1, 'active', 'text', ?, ?, ?)",
748 )
749 .bind(node_id.to_string())
750 .bind(tree_id.to_string())
751 .bind(parent.map(|p| p.to_string()))
752 .bind(&content)
753 .bind(metadata_json)
754 .bind(now)
755 .execute(&self.pool)
756 .await
757 .map_err(|e| format!("Failed to create text node: {}", e))?;
758
759 if let Some(parent_id) = parent {
761 self.add_child_to_parent(&parent_id, &node_id).await?;
762 }
763
764 Ok(node_id)
765 }
766
767 pub async fn node_create_external(
769 &self,
770 tree_id: &TreeId,
771 parent: Option<NodeId>,
772 handle: Handle,
773 metadata: Option<Value>,
774 ) -> Result<NodeId, ArborError> {
775 let node_id = NodeId::new();
776 let now = current_timestamp();
777
778 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
779 let meta_json = serde_json::to_string(&handle.meta).unwrap();
780
781 sqlx::query(
782 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
783 VALUES (?, ?, ?, 1, 'active', 'external', ?, ?, ?, ?, ?, ?)",
784 )
785 .bind(node_id.to_string())
786 .bind(tree_id.to_string())
787 .bind(parent.map(|p| p.to_string()))
788 .bind(handle.plugin_id.to_string())
789 .bind(&handle.version)
790 .bind(&handle.method)
791 .bind(&meta_json)
792 .bind(metadata_json)
793 .bind(now)
794 .execute(&self.pool)
795 .await
796 .map_err(|e| format!("Failed to create external node: {}", e))?;
797
798 if let Some(parent_id) = parent {
800 self.add_child_to_parent(&parent_id, &node_id).await?;
801 }
802
803 Ok(node_id)
804 }
805
806 pub async fn node_create_external_ephemeral(
808 &self,
809 tree_id: &TreeId,
810 parent: Option<NodeId>,
811 handle: Handle,
812 metadata: Option<Value>,
813 ) -> Result<NodeId, ArborError> {
814 let node_id = NodeId::new();
815 let now = current_timestamp();
816
817 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
818 let meta_json = serde_json::to_string(&handle.meta).unwrap();
819
820 sqlx::query(
821 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
822 VALUES (?, ?, ?, 0, 'scheduled_delete', ?, 'external', ?, ?, ?, ?, ?, ?)",
823 )
824 .bind(node_id.to_string())
825 .bind(tree_id.to_string())
826 .bind(parent.map(|p| p.to_string()))
827 .bind(now) .bind(handle.plugin_id.to_string())
829 .bind(&handle.version)
830 .bind(&handle.method)
831 .bind(&meta_json)
832 .bind(metadata_json)
833 .bind(now)
834 .execute(&self.pool)
835 .await
836 .map_err(|e| format!("Failed to create ephemeral external node: {}", e))?;
837
838 if let Some(parent_id) = parent {
840 self.add_child_to_parent(&parent_id, &node_id).await?;
841 }
842
843 Ok(node_id)
844 }
845
846 pub async fn node_get(
848 &self,
849 tree_id: &TreeId,
850 node_id: &NodeId,
851 ) -> Result<Node, ArborError> {
852 let nodes = self.get_nodes_for_tree(tree_id).await?;
853 nodes
854 .get(node_id)
855 .cloned()
856 .ok_or_else(|| ArborError::NodeNotFound { node_id: node_id.to_string(), tree_id: tree_id.to_string() })
857 }
858
859 pub async fn node_get_children(
861 &self,
862 tree_id: &TreeId,
863 node_id: &NodeId,
864 ) -> Result<Vec<NodeId>, ArborError> {
865 let rows = sqlx::query(
866 "SELECT id FROM nodes WHERE tree_id = ? AND parent_id = ? AND state = 'active'",
867 )
868 .bind(tree_id.to_string())
869 .bind(node_id.to_string())
870 .fetch_all(&self.pool)
871 .await
872 .map_err(|e| format!("Failed to fetch children: {}", e))?;
873
874 let children: Result<Vec<NodeId>, ArborError> = rows
875 .iter()
876 .map(|row| {
877 let id_str: String = row.get("id");
878 ArborId::parse_str(&id_str)
879 .map_err(|e| format!("Invalid node ID: {}", e).into())
880 })
881 .collect();
882
883 children
884 }
885
886 pub async fn node_get_parent(
888 &self,
889 tree_id: &TreeId,
890 node_id: &NodeId,
891 ) -> Result<Option<NodeId>, ArborError> {
892 let row = sqlx::query(
893 "SELECT parent_id FROM nodes WHERE tree_id = ? AND id = ?",
894 )
895 .bind(tree_id.to_string())
896 .bind(node_id.to_string())
897 .fetch_optional(&self.pool)
898 .await
899 .map_err(|e| ArborError::StorageError { operation: "fetch_parent".to_string(), detail: e.to_string() })?
900 .ok_or_else(|| ArborError::NodeNotFound { node_id: node_id.to_string(), tree_id: tree_id.to_string() })?;
901
902 let parent_id: Option<String> = row.get("parent_id");
903 match parent_id {
904 Some(id_str) => Ok(Some(
905 ArborId::parse_str(&id_str)
906 .map_err(|e| format!("Invalid parent ID: {}", e))?,
907 )),
908 None => Ok(None),
909 }
910 }
911
912 pub async fn node_get_path(
914 &self,
915 tree_id: &TreeId,
916 node_id: &NodeId,
917 ) -> Result<Vec<NodeId>, ArborError> {
918 let mut path = Vec::new();
919 let mut current_id = Some(*node_id);
920
921 while let Some(id) = current_id {
923 path.push(id);
924 current_id = self.node_get_parent(tree_id, &id).await?;
925 }
926
927 path.reverse();
929 Ok(path)
930 }
931
932 pub async fn context_list_leaves(
934 &self,
935 tree_id: &TreeId,
936 ) -> Result<Vec<NodeId>, ArborError> {
937 let rows = sqlx::query(
938 "SELECT id FROM nodes
939 WHERE tree_id = ? AND state = 'active'
940 AND id NOT IN (SELECT DISTINCT parent_id FROM nodes WHERE parent_id IS NOT NULL AND tree_id = ?)",
941 )
942 .bind(tree_id.to_string())
943 .bind(tree_id.to_string())
944 .fetch_all(&self.pool)
945 .await
946 .map_err(|e| format!("Failed to fetch leaf nodes: {}", e))?;
947
948 let leaves: Result<Vec<NodeId>, ArborError> = rows
949 .iter()
950 .map(|row| {
951 let id_str: String = row.get("id");
952 ArborId::parse_str(&id_str)
953 .map_err(|e| format!("Invalid node ID: {}", e).into())
954 })
955 .collect();
956
957 leaves
958 }
959
960 pub async fn context_get_path(
962 &self,
963 tree_id: &TreeId,
964 node_id: &NodeId,
965 ) -> Result<Vec<Node>, ArborError> {
966 let path_ids = self.node_get_path(tree_id, node_id).await?;
967 let nodes = self.get_nodes_for_tree(tree_id).await?;
968
969 let path_nodes: Result<Vec<Node>, ArborError> = path_ids
970 .iter()
971 .map(|id| {
972 nodes
973 .get(id)
974 .cloned()
975 .ok_or_else(|| ArborError::NodeNotFound { node_id: id.to_string(), tree_id: tree_id.to_string() })
976 })
977 .collect();
978
979 path_nodes
980 }
981
982 pub async fn context_get_handles(
984 &self,
985 tree_id: &TreeId,
986 node_id: &NodeId,
987 ) -> Result<Vec<Handle>, ArborError> {
988 let path_nodes = self.context_get_path(tree_id, node_id).await?;
989
990 let handles: Vec<Handle> = path_nodes
991 .iter()
992 .filter_map(|node| match &node.data {
993 NodeType::External { handle } => Some(handle.clone()),
994 NodeType::Text { .. } => None,
995 })
996 .collect();
997
998 Ok(handles)
999 }
1000
1001 pub async fn cleanup_scheduled_trees(&self) -> Result<usize, ArborError> {
1003 let now = current_timestamp();
1004 let seven_days_ago = now - (7 * 24 * 60 * 60);
1005
1006 let result = sqlx::query(
1007 "UPDATE trees
1008 SET state = 'archived', archived_at = ?, updated_at = ?
1009 WHERE state = 'scheduled_delete' AND scheduled_deletion_at < ?",
1010 )
1011 .bind(now)
1012 .bind(now)
1013 .bind(seven_days_ago)
1014 .execute(&self.pool)
1015 .await
1016 .map_err(|e| format!("Failed to archive trees: {}", e))?;
1017
1018 Ok(result.rows_affected() as usize)
1019 }
1020
1021 pub async fn tree_query_by_metadata(&self, filter: &Value) -> Result<Vec<TreeId>, ArborError> {
1023 let rows = sqlx::query(
1024 "SELECT id, metadata FROM trees WHERE state = 'active'"
1025 )
1026 .fetch_all(&self.pool)
1027 .await
1028 .map_err(|e| format!("Failed to query trees: {}", e))?;
1029
1030 let mut matching_trees = Vec::new();
1031
1032 for row in rows {
1033 let tree_id_str: String = row.get("id");
1034 let metadata_str: Option<String> = row.get("metadata");
1035
1036 if let Some(metadata_json) = metadata_str {
1037 if let Ok(metadata) = serde_json::from_str::<Value>(&metadata_json) {
1038 if metadata_matches(filter, &metadata) {
1040 if let Ok(tree_id) = ArborId::parse_str(&tree_id_str) {
1041 matching_trees.push(tree_id);
1042 }
1043 }
1044 }
1045 }
1046 }
1047
1048 Ok(matching_trees)
1049 }
1050}
1051
1052fn metadata_matches(filter: &Value, metadata: &Value) -> bool {
1054 match (filter, metadata) {
1055 (Value::Object(filter_obj), Value::Object(meta_obj)) => {
1056 filter_obj.iter().all(|(key, filter_value)| {
1058 meta_obj.get(key).map_or(false, |meta_value| {
1059 metadata_matches(filter_value, meta_value)
1060 })
1061 })
1062 }
1063 (Value::Array(filter_arr), Value::Array(meta_arr)) => {
1064 filter_arr == meta_arr
1066 }
1067 (filter_val, meta_val) => filter_val == meta_val,
1068 }
1069}
1070
1071fn current_timestamp() -> i64 {
1073 SystemTime::now()
1074 .duration_since(UNIX_EPOCH)
1075 .unwrap()
1076 .as_secs() as i64
1077}