1use super::types::{
2 ArborError, ArborId, Node, NodeId, NodeType, ResourceRefs, ResourceState, Tree, TreeId, Handle,
3};
4use serde_json::Value;
5use sqlx::{sqlite::{SqliteConnectOptions, SqlitePool}, ConnectOptions, Row};
6use uuid::Uuid;
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11#[derive(Debug, Clone)]
13pub struct ArborConfig {
14 pub scheduled_deletion_window: i64, pub archive_window: i64, pub db_path: PathBuf,
22
23 pub auto_cleanup: bool,
25
26 pub cleanup_interval: i64, }
29
30impl Default for ArborConfig {
31 fn default() -> Self {
32 Self {
33 scheduled_deletion_window: 604800, archive_window: 2592000, db_path: PathBuf::from("arbor.db"),
36 auto_cleanup: true,
37 cleanup_interval: 3600, }
39 }
40}
41
42pub struct ArborStorage {
67 pool: SqlitePool,
68 #[allow(dead_code)]
69 config: ArborConfig,
70}
71
72impl ArborStorage {
73 pub async fn new(config: ArborConfig) -> Result<Self, ArborError> {
75 let db_url = format!("sqlite:{}?mode=rwc", config.db_path.display());
76 let connect_options: SqliteConnectOptions = db_url.parse()
77 .map_err(|e| format!("Failed to parse database URL: {}", e))?;
78 let connect_options = connect_options.disable_statement_logging();
79 let pool = SqlitePool::connect_with(connect_options.clone())
80 .await
81 .map_err(|e| format!("Failed to connect to database: {}", e))?;
82
83 let storage = Self { pool, config };
84 storage.run_migrations().await?;
85
86 Ok(storage)
87 }
88
89 async fn run_migrations(&self) -> Result<(), ArborError> {
91 sqlx::query(
92 r#"
93 CREATE TABLE IF NOT EXISTS trees (
94 id TEXT PRIMARY KEY,
95 root_node_id TEXT NOT NULL,
96 ref_count INTEGER NOT NULL DEFAULT 1,
97 state TEXT NOT NULL DEFAULT 'active',
98 scheduled_deletion_at INTEGER,
99 archived_at INTEGER,
100 created_at INTEGER NOT NULL,
101 updated_at INTEGER NOT NULL,
102 metadata TEXT
103 );
104
105 CREATE TABLE IF NOT EXISTS tree_refs (
106 tree_id TEXT NOT NULL,
107 owner_id TEXT NOT NULL,
108 count INTEGER NOT NULL DEFAULT 1,
109 claimed_at INTEGER NOT NULL,
110 PRIMARY KEY (tree_id, owner_id),
111 FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
112 );
113
114 CREATE TABLE IF NOT EXISTS nodes (
115 id TEXT PRIMARY KEY,
116 tree_id TEXT NOT NULL,
117 parent_id TEXT,
118 ref_count INTEGER NOT NULL DEFAULT 1,
119 state TEXT NOT NULL DEFAULT 'active',
120 scheduled_deletion_at INTEGER,
121 archived_at INTEGER,
122 node_type TEXT NOT NULL,
123 content TEXT,
124 handle_plugin_id TEXT,
125 handle_version TEXT,
126 handle_method TEXT,
127 handle_meta TEXT,
128 created_at INTEGER NOT NULL,
129 metadata TEXT,
130 FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
131 );
132
133 CREATE TABLE IF NOT EXISTS node_refs (
134 node_id TEXT NOT NULL,
135 owner_id TEXT NOT NULL,
136 count INTEGER NOT NULL DEFAULT 1,
137 claimed_at INTEGER NOT NULL,
138 PRIMARY KEY (node_id, owner_id),
139 FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
140 );
141
142 CREATE TABLE IF NOT EXISTS node_children (
143 parent_id TEXT NOT NULL,
144 child_id TEXT NOT NULL,
145 position INTEGER NOT NULL,
146 PRIMARY KEY (parent_id, child_id),
147 FOREIGN KEY (parent_id) REFERENCES nodes(id) ON DELETE CASCADE,
148 FOREIGN KEY (child_id) REFERENCES nodes(id) ON DELETE CASCADE
149 );
150
151 CREATE INDEX IF NOT EXISTS idx_trees_state ON trees(state);
152 CREATE INDEX IF NOT EXISTS idx_trees_scheduled ON trees(scheduled_deletion_at) WHERE state = 'scheduled_delete';
153 CREATE INDEX IF NOT EXISTS idx_trees_archived ON trees(archived_at) WHERE state = 'archived';
154 CREATE INDEX IF NOT EXISTS idx_nodes_tree ON nodes(tree_id);
155 CREATE INDEX IF NOT EXISTS idx_nodes_parent ON nodes(parent_id);
156 CREATE INDEX IF NOT EXISTS idx_nodes_state ON nodes(state);
157 CREATE INDEX IF NOT EXISTS idx_nodes_scheduled ON nodes(scheduled_deletion_at) WHERE state = 'scheduled_delete';
158 CREATE INDEX IF NOT EXISTS idx_node_children_parent ON node_children(parent_id);
159 CREATE INDEX IF NOT EXISTS idx_node_children_child ON node_children(child_id);
160 "#,
161 )
162 .execute(&self.pool)
163 .await
164 .map_err(|e| format!("Failed to run migrations: {}", e))?;
165
166 Ok(())
167 }
168
169 pub async fn tree_create(
175 &self,
176 metadata: Option<serde_json::Value>,
177 owner_id: &str,
178 ) -> Result<TreeId, ArborError> {
179 self.tree_create_with_id(None, metadata, owner_id).await
180 }
181
182 pub async fn tree_create_with_id(
184 &self,
185 tree_id: Option<TreeId>,
186 metadata: Option<serde_json::Value>,
187 owner_id: &str,
188 ) -> Result<TreeId, ArborError> {
189 let tree_id = tree_id.unwrap_or_else(TreeId::new);
190 let root_id = NodeId::new();
191 let now = current_timestamp();
192
193 let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
194
195 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
197 sqlx::query(
198 "INSERT INTO trees (id, root_node_id, ref_count, state, created_at, updated_at, metadata)
199 VALUES (?, ?, 1, 'active', ?, ?, ?)",
200 )
201 .bind(tree_id.to_string())
202 .bind(root_id.to_string())
203 .bind(now)
204 .bind(now)
205 .bind(metadata_json)
206 .execute(&mut *tx)
207 .await
208 .map_err(|e| format!("Failed to create tree: {}", e))?;
209
210 sqlx::query(
212 "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at) VALUES (?, ?, 1, ?)",
213 )
214 .bind(tree_id.to_string())
215 .bind(owner_id)
216 .bind(now)
217 .execute(&mut *tx)
218 .await
219 .map_err(|e| format!("Failed to create tree ref: {}", e))?;
220
221 sqlx::query(
223 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, created_at)
224 VALUES (?, ?, NULL, 1, 'active', 'text', '', ?)",
225 )
226 .bind(root_id.to_string())
227 .bind(tree_id.to_string())
228 .bind(now)
229 .execute(&mut *tx)
230 .await
231 .map_err(|e| format!("Failed to create root node: {}", e))?;
232
233 tx.commit().await.map_err(|e| e.to_string())?;
234
235 Ok(tree_id)
236 }
237
238 pub async fn tree_get(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
240 self.tree_get_internal(tree_id, false).await
241 }
242
243 pub async fn tree_get_archived(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
245 self.tree_get_internal(tree_id, true).await
246 }
247
248 async fn tree_get_internal(
250 &self,
251 tree_id: &TreeId,
252 allow_archived: bool,
253 ) -> Result<Tree, ArborError> {
254 let tree_row = sqlx::query(
256 "SELECT id, root_node_id, ref_count, state, scheduled_deletion_at, archived_at, created_at, updated_at, metadata
257 FROM trees WHERE id = ?",
258 )
259 .bind(tree_id.to_string())
260 .fetch_optional(&self.pool)
261 .await
262 .map_err(|e| format!("Failed to fetch tree: {}", e))?
263 .ok_or_else(|| format!("Tree not found: {}", tree_id))?;
264
265 let state_str: String = tree_row.get("state");
266 let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
267
268 if !allow_archived && state == ResourceState::Archived {
269 return Err("Tree is archived, use tree_get_archived()".into());
270 }
271
272 let root_node_id: String = tree_row.get("root_node_id");
273 let root_node_id = ArborId::parse_str(&root_node_id)
274 .map_err(|e| format!("Invalid root node ID: {}", e))?;
275
276 let nodes = self.get_nodes_for_tree(tree_id).await?;
278
279 let refs = self.get_tree_refs(tree_id).await?;
281
282 let metadata_json: Option<String> = tree_row.get("metadata");
283 let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
284
285 Ok(Tree {
286 id: *tree_id,
287 root: root_node_id,
288 nodes,
289 state: Some(state),
290 refs: Some(refs),
291 scheduled_deletion_at: tree_row.get("scheduled_deletion_at"),
292 archived_at: tree_row.get("archived_at"),
293 created_at: tree_row.get("created_at"),
294 updated_at: tree_row.get("updated_at"),
295 metadata,
296 })
297 }
298
299 async fn get_nodes_for_tree(&self, tree_id: &TreeId) -> Result<HashMap<NodeId, Node>, ArborError> {
301 let rows = sqlx::query(
302 "SELECT id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, archived_at,
303 node_type, content, handle_plugin_id, handle_version, handle_method,
304 handle_meta, created_at, metadata
305 FROM nodes WHERE tree_id = ?",
306 )
307 .bind(tree_id.to_string())
308 .fetch_all(&self.pool)
309 .await
310 .map_err(|e| format!("Failed to fetch nodes: {}", e))?;
311
312 let mut nodes = HashMap::new();
313
314 for row in rows {
315 let node_id_str: String = row.get("id");
316 let node_id = ArborId::parse_str(&node_id_str)
317 .map_err(|e| format!("Invalid node ID: {}", e))?;
318
319 let parent_id_str: Option<String> = row.get("parent_id");
320 let parent_id = parent_id_str
321 .map(|s| ArborId::parse_str(&s).map_err(|e| format!("Invalid parent ID: {}", e)))
322 .transpose()?;
323
324 let children = self.get_node_children_internal(&node_id).await?;
326
327 let node_type_str: String = row.get("node_type");
328 let data = match node_type_str.as_str() {
329 "text" => {
330 let content: String = row.get("content");
331 NodeType::Text { content }
332 }
333 "external" => {
334 let plugin_id_str: String = row.get("handle_plugin_id");
335 let plugin_id = Uuid::parse_str(&plugin_id_str)
336 .map_err(|e| format!("Invalid handle plugin_id: {}", e))?;
337 let version: String = row.get("handle_version");
338 let method: String = row.get("handle_method");
339 let meta_json: Option<String> = row.get("handle_meta");
340 let meta: Vec<String> = meta_json
341 .and_then(|s| serde_json::from_str(&s).ok())
342 .unwrap_or_default();
343
344 NodeType::External {
345 handle: Handle::new(plugin_id, version, method)
346 .with_meta(meta),
347 }
348 }
349 _ => return Err(format!("Unknown node type: {}", node_type_str).into()),
350 };
351
352 let state_str: String = row.get("state");
353 let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
354
355 let refs = self.get_node_refs(&node_id).await?;
356
357 let metadata_json: Option<String> = row.get("metadata");
358 let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
359
360 let node = Node {
361 id: node_id,
362 parent: parent_id,
363 children,
364 data,
365 state: Some(state),
366 refs: Some(refs),
367 scheduled_deletion_at: row.get("scheduled_deletion_at"),
368 archived_at: row.get("archived_at"),
369 created_at: row.get("created_at"),
370 metadata,
371 };
372
373 nodes.insert(node_id, node);
374 }
375
376 Ok(nodes)
377 }
378
379 async fn get_node_children_internal(&self, node_id: &NodeId) -> Result<Vec<NodeId>, ArborError> {
381 let rows = sqlx::query(
382 "SELECT child_id FROM node_children WHERE parent_id = ? ORDER BY position",
383 )
384 .bind(node_id.to_string())
385 .fetch_all(&self.pool)
386 .await
387 .map_err(|e| format!("Failed to fetch node children: {}", e))?;
388
389 let children: Result<Vec<NodeId>, ArborError> = rows
390 .iter()
391 .map(|row| {
392 let child_id_str: String = row.get("child_id");
393 ArborId::parse_str(&child_id_str)
394 .map_err(|e| e.into())
395 })
396 .collect();
397
398 children
399 }
400
401 async fn add_child_to_parent(&self, parent_id: &NodeId, child_id: &NodeId) -> Result<(), ArborError> {
403 let row = sqlx::query(
405 "SELECT COALESCE(MAX(position), -1) + 1 as next_pos FROM node_children WHERE parent_id = ?",
406 )
407 .bind(parent_id.to_string())
408 .fetch_one(&self.pool)
409 .await
410 .map_err(|e| format!("Failed to get next position: {}", e))?;
411
412 let next_pos: i64 = row.get("next_pos");
413
414 sqlx::query(
415 "INSERT INTO node_children (parent_id, child_id, position) VALUES (?, ?, ?)",
416 )
417 .bind(parent_id.to_string())
418 .bind(child_id.to_string())
419 .bind(next_pos)
420 .execute(&self.pool)
421 .await
422 .map_err(|e| format!("Failed to add child to parent: {}", e))?;
423
424 Ok(())
425 }
426
427 async fn get_tree_refs(&self, tree_id: &TreeId) -> Result<ResourceRefs, ArborError> {
429 let rows = sqlx::query(
430 "SELECT owner_id, count FROM tree_refs WHERE tree_id = ?",
431 )
432 .bind(tree_id.to_string())
433 .fetch_all(&self.pool)
434 .await
435 .map_err(|e| format!("Failed to fetch tree refs: {}", e))?;
436
437 let mut owners = HashMap::new();
438 let mut total = 0i64;
439
440 for row in rows {
441 let owner_id: String = row.get("owner_id");
442 let count: i64 = row.get("count");
443 owners.insert(owner_id, count);
444 total += count;
445 }
446
447 Ok(ResourceRefs {
448 ref_count: total,
449 owners,
450 })
451 }
452
453 async fn get_node_refs(&self, node_id: &NodeId) -> Result<ResourceRefs, ArborError> {
455 let rows = sqlx::query(
456 "SELECT owner_id, count FROM node_refs WHERE node_id = ?",
457 )
458 .bind(node_id.to_string())
459 .fetch_all(&self.pool)
460 .await
461 .map_err(|e| format!("Failed to fetch node refs: {}", e))?;
462
463 let mut owners = HashMap::new();
464 let mut total = 0i64;
465
466 for row in rows {
467 let owner_id: String = row.get("owner_id");
468 let count: i64 = row.get("count");
469 owners.insert(owner_id, count);
470 total += count;
471 }
472
473 Ok(ResourceRefs {
474 ref_count: total,
475 owners,
476 })
477 }
478
479 pub async fn tree_list(&self, include_scheduled: bool) -> Result<Vec<TreeId>, ArborError> {
481 let query = if include_scheduled {
482 "SELECT id FROM trees WHERE state IN ('active', 'scheduled_delete')"
483 } else {
484 "SELECT id FROM trees WHERE state = 'active'"
485 };
486
487 let rows = sqlx::query(query)
488 .fetch_all(&self.pool)
489 .await
490 .map_err(|e| format!("Failed to list trees: {}", e))?;
491
492 let tree_ids: Result<Vec<TreeId>, ArborError> = rows
493 .iter()
494 .map(|row| {
495 let id_str: String = row.get("id");
496 ArborId::parse_str(&id_str)
497 .map_err(|e| format!("Invalid tree ID: {}", e).into())
498 })
499 .collect();
500
501 tree_ids
502 }
503
504 pub async fn tree_list_scheduled(&self) -> Result<Vec<TreeId>, ArborError> {
506 let rows = sqlx::query(
507 "SELECT id FROM trees WHERE state = 'scheduled_delete'",
508 )
509 .fetch_all(&self.pool)
510 .await
511 .map_err(|e| format!("Failed to list scheduled trees: {}", e))?;
512
513 let tree_ids: Result<Vec<TreeId>, ArborError> = rows
514 .iter()
515 .map(|row| {
516 let id_str: String = row.get("id");
517 ArborId::parse_str(&id_str)
518 .map_err(|e| format!("Invalid tree ID: {}", e).into())
519 })
520 .collect();
521
522 tree_ids
523 }
524
525 pub async fn tree_list_archived(&self) -> Result<Vec<TreeId>, ArborError> {
527 let rows = sqlx::query(
528 "SELECT id FROM trees WHERE state = 'archived'",
529 )
530 .fetch_all(&self.pool)
531 .await
532 .map_err(|e| format!("Failed to list archived trees: {}", e))?;
533
534 let tree_ids: Result<Vec<TreeId>, ArborError> = rows
535 .iter()
536 .map(|row| {
537 let id_str: String = row.get("id");
538 ArborId::parse_str(&id_str)
539 .map_err(|e| format!("Invalid tree ID: {}", e).into())
540 })
541 .collect();
542
543 tree_ids
544 }
545
546 pub async fn tree_update_metadata(
548 &self,
549 tree_id: &TreeId,
550 metadata: Value,
551 ) -> Result<(), ArborError> {
552 let now = current_timestamp();
553 let metadata_json = serde_json::to_string(&metadata)
554 .map_err(|e| format!("Failed to serialize metadata: {}", e))?;
555
556 sqlx::query(
557 "UPDATE trees SET metadata = ?, updated_at = ? WHERE id = ? AND state = 'active'",
558 )
559 .bind(metadata_json)
560 .bind(now)
561 .bind(tree_id.to_string())
562 .execute(&self.pool)
563 .await
564 .map_err(|e| format!("Failed to update tree metadata: {}", e))?;
565
566 Ok(())
567 }
568
569 pub async fn tree_claim(
571 &self,
572 tree_id: &TreeId,
573 owner_id: &str,
574 count: i64,
575 ) -> Result<i64, ArborError> {
576 let now = current_timestamp();
577 let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
578
579 let tree_row = sqlx::query(
581 "SELECT state, scheduled_deletion_at FROM trees WHERE id = ?",
582 )
583 .bind(tree_id.to_string())
584 .fetch_optional(&mut *tx)
585 .await
586 .map_err(|e| format!("Failed to fetch tree: {}", e))?
587 .ok_or_else(|| format!("Tree not found: {}", tree_id))?;
588
589 let state_str: String = tree_row.get("state");
590 let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
591
592 if state == ResourceState::Archived {
593 return Err("Cannot claim archived tree".into());
594 }
595
596 if state == ResourceState::ScheduledDelete {
598 sqlx::query(
599 "UPDATE trees SET state = 'active', scheduled_deletion_at = NULL, updated_at = ? WHERE id = ?",
600 )
601 .bind(now)
602 .bind(tree_id.to_string())
603 .execute(&mut *tx)
604 .await
605 .map_err(|e| format!("Failed to reactivate tree: {}", e))?;
606 }
607
608 sqlx::query(
610 "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at)
611 VALUES (?, ?, ?, ?)
612 ON CONFLICT(tree_id, owner_id) DO UPDATE SET
613 count = count + excluded.count,
614 claimed_at = excluded.claimed_at",
615 )
616 .bind(tree_id.to_string())
617 .bind(owner_id)
618 .bind(count)
619 .bind(now)
620 .execute(&mut *tx)
621 .await
622 .map_err(|e| format!("Failed to claim tree: {}", e))?;
623
624 sqlx::query(
626 "UPDATE trees SET ref_count = ref_count + ?, updated_at = ? WHERE id = ?",
627 )
628 .bind(count)
629 .bind(now)
630 .bind(tree_id.to_string())
631 .execute(&mut *tx)
632 .await
633 .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
634
635 let new_count_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
637 .bind(tree_id.to_string())
638 .fetch_one(&mut *tx)
639 .await
640 .map_err(|e| format!("Failed to fetch new ref_count: {}", e))?;
641
642 let new_count: i64 = new_count_row.get("ref_count");
643
644 tx.commit().await.map_err(|e| e.to_string())?;
645 Ok(new_count)
646 }
647
648 pub async fn tree_release(
650 &self,
651 tree_id: &TreeId,
652 owner_id: &str,
653 count: i64,
654 ) -> Result<i64, ArborError> {
655 let now = current_timestamp();
656 let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
657
658 let owner_ref = sqlx::query(
660 "SELECT count FROM tree_refs WHERE tree_id = ? AND owner_id = ?",
661 )
662 .bind(tree_id.to_string())
663 .bind(owner_id)
664 .fetch_optional(&mut *tx)
665 .await
666 .map_err(|e| format!("Failed to fetch tree ref: {}", e))?
667 .ok_or_else(|| format!("No reference found for owner {}", owner_id))?;
668
669 let current_count: i64 = owner_ref.get("count");
670 if current_count < count {
671 return Err(format!(
672 "Cannot release {} references, owner only has {}",
673 count, current_count
674 )
675 .into());
676 }
677
678 let new_count = current_count - count;
679
680 if new_count == 0 {
682 sqlx::query("DELETE FROM tree_refs WHERE tree_id = ? AND owner_id = ?")
683 .bind(tree_id.to_string())
684 .bind(owner_id)
685 .execute(&mut *tx)
686 .await
687 .map_err(|e| format!("Failed to delete tree ref: {}", e))?;
688 } else {
689 sqlx::query(
690 "UPDATE tree_refs SET count = ?, claimed_at = ? WHERE tree_id = ? AND owner_id = ?",
691 )
692 .bind(new_count)
693 .bind(now)
694 .bind(tree_id.to_string())
695 .bind(owner_id)
696 .execute(&mut *tx)
697 .await
698 .map_err(|e| format!("Failed to update tree ref: {}", e))?;
699 }
700
701 sqlx::query(
703 "UPDATE trees SET ref_count = ref_count - ?, updated_at = ? WHERE id = ?",
704 )
705 .bind(count)
706 .bind(now)
707 .bind(tree_id.to_string())
708 .execute(&mut *tx)
709 .await
710 .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
711
712 let tree_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
714 .bind(tree_id.to_string())
715 .fetch_one(&mut *tx)
716 .await
717 .map_err(|e| format!("Failed to fetch tree: {}", e))?;
718
719 let ref_count: i64 = tree_row.get("ref_count");
720 if ref_count == 0 {
721 sqlx::query(
722 "UPDATE trees SET state = 'scheduled_delete', scheduled_deletion_at = ?, updated_at = ? WHERE id = ?",
723 )
724 .bind(now)
725 .bind(now)
726 .bind(tree_id.to_string())
727 .execute(&mut *tx)
728 .await
729 .map_err(|e| format!("Failed to schedule tree deletion: {}", e))?;
730 }
731
732 tx.commit().await.map_err(|e| e.to_string())?;
733 Ok(ref_count)
734 }
735
736 pub async fn node_create_text(
738 &self,
739 tree_id: &TreeId,
740 parent: Option<NodeId>,
741 content: String,
742 metadata: Option<Value>,
743 ) -> Result<NodeId, ArborError> {
744 let node_id = NodeId::new();
745 let now = current_timestamp();
746
747 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
748
749 sqlx::query(
750 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, metadata, created_at)
751 VALUES (?, ?, ?, 1, 'active', 'text', ?, ?, ?)",
752 )
753 .bind(node_id.to_string())
754 .bind(tree_id.to_string())
755 .bind(parent.map(|p| p.to_string()))
756 .bind(&content)
757 .bind(metadata_json)
758 .bind(now)
759 .execute(&self.pool)
760 .await
761 .map_err(|e| format!("Failed to create text node: {}", e))?;
762
763 if let Some(parent_id) = parent {
765 self.add_child_to_parent(&parent_id, &node_id).await?;
766 }
767
768 Ok(node_id)
769 }
770
771 pub async fn node_create_external(
773 &self,
774 tree_id: &TreeId,
775 parent: Option<NodeId>,
776 handle: Handle,
777 metadata: Option<Value>,
778 ) -> Result<NodeId, ArborError> {
779 let node_id = NodeId::new();
780 let now = current_timestamp();
781
782 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
783 let meta_json = serde_json::to_string(&handle.meta).unwrap();
784
785 sqlx::query(
786 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
787 VALUES (?, ?, ?, 1, 'active', 'external', ?, ?, ?, ?, ?, ?)",
788 )
789 .bind(node_id.to_string())
790 .bind(tree_id.to_string())
791 .bind(parent.map(|p| p.to_string()))
792 .bind(handle.plugin_id.to_string())
793 .bind(&handle.version)
794 .bind(&handle.method)
795 .bind(&meta_json)
796 .bind(metadata_json)
797 .bind(now)
798 .execute(&self.pool)
799 .await
800 .map_err(|e| format!("Failed to create external node: {}", e))?;
801
802 if let Some(parent_id) = parent {
804 self.add_child_to_parent(&parent_id, &node_id).await?;
805 }
806
807 Ok(node_id)
808 }
809
810 pub async fn node_create_external_ephemeral(
812 &self,
813 tree_id: &TreeId,
814 parent: Option<NodeId>,
815 handle: Handle,
816 metadata: Option<Value>,
817 ) -> Result<NodeId, ArborError> {
818 let node_id = NodeId::new();
819 let now = current_timestamp();
820
821 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
822 let meta_json = serde_json::to_string(&handle.meta).unwrap();
823
824 sqlx::query(
825 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
826 VALUES (?, ?, ?, 0, 'scheduled_delete', ?, 'external', ?, ?, ?, ?, ?, ?)",
827 )
828 .bind(node_id.to_string())
829 .bind(tree_id.to_string())
830 .bind(parent.map(|p| p.to_string()))
831 .bind(now) .bind(handle.plugin_id.to_string())
833 .bind(&handle.version)
834 .bind(&handle.method)
835 .bind(&meta_json)
836 .bind(metadata_json)
837 .bind(now)
838 .execute(&self.pool)
839 .await
840 .map_err(|e| format!("Failed to create ephemeral external node: {}", e))?;
841
842 if let Some(parent_id) = parent {
844 self.add_child_to_parent(&parent_id, &node_id).await?;
845 }
846
847 Ok(node_id)
848 }
849
850 pub async fn node_get(
852 &self,
853 tree_id: &TreeId,
854 node_id: &NodeId,
855 ) -> Result<Node, ArborError> {
856 let nodes = self.get_nodes_for_tree(tree_id).await?;
857 nodes
858 .get(node_id)
859 .cloned()
860 .ok_or_else(|| format!("Node not found: {}", node_id).into())
861 }
862
863 pub async fn node_get_children(
865 &self,
866 tree_id: &TreeId,
867 node_id: &NodeId,
868 ) -> Result<Vec<NodeId>, ArborError> {
869 let rows = sqlx::query(
870 "SELECT id FROM nodes WHERE tree_id = ? AND parent_id = ? AND state = 'active'",
871 )
872 .bind(tree_id.to_string())
873 .bind(node_id.to_string())
874 .fetch_all(&self.pool)
875 .await
876 .map_err(|e| format!("Failed to fetch children: {}", e))?;
877
878 let children: Result<Vec<NodeId>, ArborError> = rows
879 .iter()
880 .map(|row| {
881 let id_str: String = row.get("id");
882 ArborId::parse_str(&id_str)
883 .map_err(|e| format!("Invalid node ID: {}", e).into())
884 })
885 .collect();
886
887 children
888 }
889
890 pub async fn node_get_parent(
892 &self,
893 tree_id: &TreeId,
894 node_id: &NodeId,
895 ) -> Result<Option<NodeId>, ArborError> {
896 let row = sqlx::query(
897 "SELECT parent_id FROM nodes WHERE tree_id = ? AND id = ?",
898 )
899 .bind(tree_id.to_string())
900 .bind(node_id.to_string())
901 .fetch_optional(&self.pool)
902 .await
903 .map_err(|e| format!("Failed to fetch parent: {}", e))?
904 .ok_or_else(|| format!("Node not found: {}", node_id))?;
905
906 let parent_id: Option<String> = row.get("parent_id");
907 match parent_id {
908 Some(id_str) => Ok(Some(
909 ArborId::parse_str(&id_str)
910 .map_err(|e| format!("Invalid parent ID: {}", e))?,
911 )),
912 None => Ok(None),
913 }
914 }
915
916 pub async fn node_get_path(
918 &self,
919 tree_id: &TreeId,
920 node_id: &NodeId,
921 ) -> Result<Vec<NodeId>, ArborError> {
922 let mut path = Vec::new();
923 let mut current_id = Some(*node_id);
924
925 while let Some(id) = current_id {
927 path.push(id);
928 current_id = self.node_get_parent(tree_id, &id).await?;
929 }
930
931 path.reverse();
933 Ok(path)
934 }
935
936 pub async fn context_list_leaves(
938 &self,
939 tree_id: &TreeId,
940 ) -> Result<Vec<NodeId>, ArborError> {
941 let rows = sqlx::query(
942 "SELECT id FROM nodes
943 WHERE tree_id = ? AND state = 'active'
944 AND id NOT IN (SELECT DISTINCT parent_id FROM nodes WHERE parent_id IS NOT NULL AND tree_id = ?)",
945 )
946 .bind(tree_id.to_string())
947 .bind(tree_id.to_string())
948 .fetch_all(&self.pool)
949 .await
950 .map_err(|e| format!("Failed to fetch leaf nodes: {}", e))?;
951
952 let leaves: Result<Vec<NodeId>, ArborError> = rows
953 .iter()
954 .map(|row| {
955 let id_str: String = row.get("id");
956 ArborId::parse_str(&id_str)
957 .map_err(|e| format!("Invalid node ID: {}", e).into())
958 })
959 .collect();
960
961 leaves
962 }
963
964 pub async fn context_get_path(
966 &self,
967 tree_id: &TreeId,
968 node_id: &NodeId,
969 ) -> Result<Vec<Node>, ArborError> {
970 let path_ids = self.node_get_path(tree_id, node_id).await?;
971 let nodes = self.get_nodes_for_tree(tree_id).await?;
972
973 let path_nodes: Result<Vec<Node>, ArborError> = path_ids
974 .iter()
975 .map(|id| {
976 nodes
977 .get(id)
978 .cloned()
979 .ok_or_else(|| format!("Node not found in path: {}", id).into())
980 })
981 .collect();
982
983 path_nodes
984 }
985
986 pub async fn context_get_handles(
988 &self,
989 tree_id: &TreeId,
990 node_id: &NodeId,
991 ) -> Result<Vec<Handle>, ArborError> {
992 let path_nodes = self.context_get_path(tree_id, node_id).await?;
993
994 let handles: Vec<Handle> = path_nodes
995 .iter()
996 .filter_map(|node| match &node.data {
997 NodeType::External { handle } => Some(handle.clone()),
998 NodeType::Text { .. } => None,
999 })
1000 .collect();
1001
1002 Ok(handles)
1003 }
1004
1005 pub async fn cleanup_scheduled_trees(&self) -> Result<usize, ArborError> {
1007 let now = current_timestamp();
1008 let seven_days_ago = now - (7 * 24 * 60 * 60);
1009
1010 let result = sqlx::query(
1011 "UPDATE trees
1012 SET state = 'archived', archived_at = ?, updated_at = ?
1013 WHERE state = 'scheduled_delete' AND scheduled_deletion_at < ?",
1014 )
1015 .bind(now)
1016 .bind(now)
1017 .bind(seven_days_ago)
1018 .execute(&self.pool)
1019 .await
1020 .map_err(|e| format!("Failed to archive trees: {}", e))?;
1021
1022 Ok(result.rows_affected() as usize)
1023 }
1024
1025 pub async fn tree_query_by_metadata(&self, filter: &Value) -> Result<Vec<TreeId>, ArborError> {
1027 let rows = sqlx::query(
1028 "SELECT id, metadata FROM trees WHERE state = 'active'"
1029 )
1030 .fetch_all(&self.pool)
1031 .await
1032 .map_err(|e| format!("Failed to query trees: {}", e))?;
1033
1034 let mut matching_trees = Vec::new();
1035
1036 for row in rows {
1037 let tree_id_str: String = row.get("id");
1038 let metadata_str: Option<String> = row.get("metadata");
1039
1040 if let Some(metadata_json) = metadata_str {
1041 if let Ok(metadata) = serde_json::from_str::<Value>(&metadata_json) {
1042 if metadata_matches(filter, &metadata) {
1044 if let Ok(tree_id) = ArborId::parse_str(&tree_id_str) {
1045 matching_trees.push(tree_id);
1046 }
1047 }
1048 }
1049 }
1050 }
1051
1052 Ok(matching_trees)
1053 }
1054}
1055
1056fn metadata_matches(filter: &Value, metadata: &Value) -> bool {
1058 match (filter, metadata) {
1059 (Value::Object(filter_obj), Value::Object(meta_obj)) => {
1060 filter_obj.iter().all(|(key, filter_value)| {
1062 meta_obj.get(key).map_or(false, |meta_value| {
1063 metadata_matches(filter_value, meta_value)
1064 })
1065 })
1066 }
1067 (Value::Array(filter_arr), Value::Array(meta_arr)) => {
1068 filter_arr == meta_arr
1070 }
1071 (filter_val, meta_val) => filter_val == meta_val,
1072 }
1073}
1074
1075fn current_timestamp() -> i64 {
1077 SystemTime::now()
1078 .duration_since(UNIX_EPOCH)
1079 .unwrap()
1080 .as_secs() as i64
1081}