1use super::types::{
2 ArborError, ArborId, Node, NodeId, NodeType, ResourceRefs, ResourceState, Tree, TreeId, Handle,
3};
4use serde_json::Value;
5use sqlx::{sqlite::{SqliteConnectOptions, SqlitePool}, ConnectOptions, Row};
6use uuid::Uuid;
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11#[derive(Debug, Clone)]
13pub struct ArborConfig {
14 pub scheduled_deletion_window: i64, pub archive_window: i64, pub db_path: PathBuf,
22
23 pub auto_cleanup: bool,
25
26 pub cleanup_interval: i64, }
29
30impl Default for ArborConfig {
31 fn default() -> Self {
32 Self {
33 scheduled_deletion_window: 604800, archive_window: 2592000, db_path: PathBuf::from("arbor.db"),
36 auto_cleanup: true,
37 cleanup_interval: 3600, }
39 }
40}
41
42pub struct ArborStorage {
67 pool: SqlitePool,
68 #[allow(dead_code)]
69 config: ArborConfig,
70}
71
72impl ArborStorage {
73 pub async fn new(config: ArborConfig) -> Result<Self, ArborError> {
75 let db_url = format!("sqlite:{}?mode=rwc", config.db_path.display());
76 let connect_options: SqliteConnectOptions = db_url.parse()
77 .map_err(|e| format!("Failed to parse database URL: {}", e))?;
78 let connect_options = connect_options.disable_statement_logging();
79 let pool = SqlitePool::connect_with(connect_options.clone())
80 .await
81 .map_err(|e| format!("Failed to connect to database: {}", e))?;
82
83 let storage = Self { pool, config };
84 storage.run_migrations().await?;
85
86 Ok(storage)
87 }
88
89 async fn run_migrations(&self) -> Result<(), ArborError> {
91 sqlx::query(
92 r#"
93 CREATE TABLE IF NOT EXISTS trees (
94 id TEXT PRIMARY KEY,
95 root_node_id TEXT NOT NULL,
96 ref_count INTEGER NOT NULL DEFAULT 1,
97 state TEXT NOT NULL DEFAULT 'active',
98 scheduled_deletion_at INTEGER,
99 archived_at INTEGER,
100 created_at INTEGER NOT NULL,
101 updated_at INTEGER NOT NULL,
102 metadata TEXT
103 );
104
105 CREATE TABLE IF NOT EXISTS tree_refs (
106 tree_id TEXT NOT NULL,
107 owner_id TEXT NOT NULL,
108 count INTEGER NOT NULL DEFAULT 1,
109 claimed_at INTEGER NOT NULL,
110 PRIMARY KEY (tree_id, owner_id),
111 FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
112 );
113
114 CREATE TABLE IF NOT EXISTS nodes (
115 id TEXT PRIMARY KEY,
116 tree_id TEXT NOT NULL,
117 parent_id TEXT,
118 ref_count INTEGER NOT NULL DEFAULT 1,
119 state TEXT NOT NULL DEFAULT 'active',
120 scheduled_deletion_at INTEGER,
121 archived_at INTEGER,
122 node_type TEXT NOT NULL,
123 content TEXT,
124 handle_plugin_id TEXT,
125 handle_version TEXT,
126 handle_method TEXT,
127 handle_meta TEXT,
128 created_at INTEGER NOT NULL,
129 metadata TEXT,
130 FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
131 );
132
133 CREATE TABLE IF NOT EXISTS node_refs (
134 node_id TEXT NOT NULL,
135 owner_id TEXT NOT NULL,
136 count INTEGER NOT NULL DEFAULT 1,
137 claimed_at INTEGER NOT NULL,
138 PRIMARY KEY (node_id, owner_id),
139 FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
140 );
141
142 CREATE TABLE IF NOT EXISTS node_children (
143 parent_id TEXT NOT NULL,
144 child_id TEXT NOT NULL,
145 position INTEGER NOT NULL,
146 PRIMARY KEY (parent_id, child_id),
147 FOREIGN KEY (parent_id) REFERENCES nodes(id) ON DELETE CASCADE,
148 FOREIGN KEY (child_id) REFERENCES nodes(id) ON DELETE CASCADE
149 );
150
151 CREATE INDEX IF NOT EXISTS idx_trees_state ON trees(state);
152 CREATE INDEX IF NOT EXISTS idx_trees_scheduled ON trees(scheduled_deletion_at) WHERE state = 'scheduled_delete';
153 CREATE INDEX IF NOT EXISTS idx_trees_archived ON trees(archived_at) WHERE state = 'archived';
154 CREATE INDEX IF NOT EXISTS idx_nodes_tree ON nodes(tree_id);
155 CREATE INDEX IF NOT EXISTS idx_nodes_parent ON nodes(parent_id);
156 CREATE INDEX IF NOT EXISTS idx_nodes_state ON nodes(state);
157 CREATE INDEX IF NOT EXISTS idx_nodes_scheduled ON nodes(scheduled_deletion_at) WHERE state = 'scheduled_delete';
158 CREATE INDEX IF NOT EXISTS idx_node_children_parent ON node_children(parent_id);
159 CREATE INDEX IF NOT EXISTS idx_node_children_child ON node_children(child_id);
160 "#,
161 )
162 .execute(&self.pool)
163 .await
164 .map_err(|e| format!("Failed to run migrations: {}", e))?;
165
166 Ok(())
167 }
168
169 pub async fn tree_create(
175 &self,
176 metadata: Option<serde_json::Value>,
177 owner_id: &str,
178 ) -> Result<TreeId, ArborError> {
179 let tree_id = TreeId::new();
180 let root_id = NodeId::new();
181 let now = current_timestamp();
182
183 let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
184
185 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
187 sqlx::query(
188 "INSERT INTO trees (id, root_node_id, ref_count, state, created_at, updated_at, metadata)
189 VALUES (?, ?, 1, 'active', ?, ?, ?)",
190 )
191 .bind(tree_id.to_string())
192 .bind(root_id.to_string())
193 .bind(now)
194 .bind(now)
195 .bind(metadata_json)
196 .execute(&mut *tx)
197 .await
198 .map_err(|e| format!("Failed to create tree: {}", e))?;
199
200 sqlx::query(
202 "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at) VALUES (?, ?, 1, ?)",
203 )
204 .bind(tree_id.to_string())
205 .bind(owner_id)
206 .bind(now)
207 .execute(&mut *tx)
208 .await
209 .map_err(|e| format!("Failed to create tree ref: {}", e))?;
210
211 sqlx::query(
213 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, created_at)
214 VALUES (?, ?, NULL, 1, 'active', 'text', '', ?)",
215 )
216 .bind(root_id.to_string())
217 .bind(tree_id.to_string())
218 .bind(now)
219 .execute(&mut *tx)
220 .await
221 .map_err(|e| format!("Failed to create root node: {}", e))?;
222
223 tx.commit().await.map_err(|e| e.to_string())?;
224
225 Ok(tree_id)
226 }
227
228 pub async fn tree_get(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
230 self.tree_get_internal(tree_id, false).await
231 }
232
233 pub async fn tree_get_archived(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
235 self.tree_get_internal(tree_id, true).await
236 }
237
238 async fn tree_get_internal(
240 &self,
241 tree_id: &TreeId,
242 allow_archived: bool,
243 ) -> Result<Tree, ArborError> {
244 let tree_row = sqlx::query(
246 "SELECT id, root_node_id, ref_count, state, scheduled_deletion_at, archived_at, created_at, updated_at, metadata
247 FROM trees WHERE id = ?",
248 )
249 .bind(tree_id.to_string())
250 .fetch_optional(&self.pool)
251 .await
252 .map_err(|e| format!("Failed to fetch tree: {}", e))?
253 .ok_or_else(|| format!("Tree not found: {}", tree_id))?;
254
255 let state_str: String = tree_row.get("state");
256 let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
257
258 if !allow_archived && state == ResourceState::Archived {
259 return Err("Tree is archived, use tree_get_archived()".into());
260 }
261
262 let root_node_id: String = tree_row.get("root_node_id");
263 let root_node_id = ArborId::parse_str(&root_node_id)
264 .map_err(|e| format!("Invalid root node ID: {}", e))?;
265
266 let nodes = self.get_nodes_for_tree(tree_id).await?;
268
269 let refs = self.get_tree_refs(tree_id).await?;
271
272 let metadata_json: Option<String> = tree_row.get("metadata");
273 let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
274
275 Ok(Tree {
276 id: *tree_id,
277 root: root_node_id,
278 nodes,
279 state: Some(state),
280 refs: Some(refs),
281 scheduled_deletion_at: tree_row.get("scheduled_deletion_at"),
282 archived_at: tree_row.get("archived_at"),
283 created_at: tree_row.get("created_at"),
284 updated_at: tree_row.get("updated_at"),
285 metadata,
286 })
287 }
288
289 async fn get_nodes_for_tree(&self, tree_id: &TreeId) -> Result<HashMap<NodeId, Node>, ArborError> {
291 let rows = sqlx::query(
292 "SELECT id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, archived_at,
293 node_type, content, handle_plugin_id, handle_version, handle_method,
294 handle_meta, created_at, metadata
295 FROM nodes WHERE tree_id = ?",
296 )
297 .bind(tree_id.to_string())
298 .fetch_all(&self.pool)
299 .await
300 .map_err(|e| format!("Failed to fetch nodes: {}", e))?;
301
302 let mut nodes = HashMap::new();
303
304 for row in rows {
305 let node_id_str: String = row.get("id");
306 let node_id = ArborId::parse_str(&node_id_str)
307 .map_err(|e| format!("Invalid node ID: {}", e))?;
308
309 let parent_id_str: Option<String> = row.get("parent_id");
310 let parent_id = parent_id_str
311 .map(|s| ArborId::parse_str(&s).map_err(|e| format!("Invalid parent ID: {}", e)))
312 .transpose()?;
313
314 let children = self.get_node_children_internal(&node_id).await?;
316
317 let node_type_str: String = row.get("node_type");
318 let data = match node_type_str.as_str() {
319 "text" => {
320 let content: String = row.get("content");
321 NodeType::Text { content }
322 }
323 "external" => {
324 let plugin_id_str: String = row.get("handle_plugin_id");
325 let plugin_id = Uuid::parse_str(&plugin_id_str)
326 .map_err(|e| format!("Invalid handle plugin_id: {}", e))?;
327 let version: String = row.get("handle_version");
328 let method: String = row.get("handle_method");
329 let meta_json: Option<String> = row.get("handle_meta");
330 let meta: Vec<String> = meta_json
331 .and_then(|s| serde_json::from_str(&s).ok())
332 .unwrap_or_default();
333
334 NodeType::External {
335 handle: Handle::new(plugin_id, version, method)
336 .with_meta(meta),
337 }
338 }
339 _ => return Err(format!("Unknown node type: {}", node_type_str).into()),
340 };
341
342 let state_str: String = row.get("state");
343 let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
344
345 let refs = self.get_node_refs(&node_id).await?;
346
347 let metadata_json: Option<String> = row.get("metadata");
348 let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
349
350 let node = Node {
351 id: node_id,
352 parent: parent_id,
353 children,
354 data,
355 state: Some(state),
356 refs: Some(refs),
357 scheduled_deletion_at: row.get("scheduled_deletion_at"),
358 archived_at: row.get("archived_at"),
359 created_at: row.get("created_at"),
360 metadata,
361 };
362
363 nodes.insert(node_id, node);
364 }
365
366 Ok(nodes)
367 }
368
369 async fn get_node_children_internal(&self, node_id: &NodeId) -> Result<Vec<NodeId>, ArborError> {
371 let rows = sqlx::query(
372 "SELECT child_id FROM node_children WHERE parent_id = ? ORDER BY position",
373 )
374 .bind(node_id.to_string())
375 .fetch_all(&self.pool)
376 .await
377 .map_err(|e| format!("Failed to fetch node children: {}", e))?;
378
379 let children: Result<Vec<NodeId>, ArborError> = rows
380 .iter()
381 .map(|row| {
382 let child_id_str: String = row.get("child_id");
383 ArborId::parse_str(&child_id_str)
384 .map_err(|e| e.into())
385 })
386 .collect();
387
388 children
389 }
390
391 async fn add_child_to_parent(&self, parent_id: &NodeId, child_id: &NodeId) -> Result<(), ArborError> {
393 let row = sqlx::query(
395 "SELECT COALESCE(MAX(position), -1) + 1 as next_pos FROM node_children WHERE parent_id = ?",
396 )
397 .bind(parent_id.to_string())
398 .fetch_one(&self.pool)
399 .await
400 .map_err(|e| format!("Failed to get next position: {}", e))?;
401
402 let next_pos: i64 = row.get("next_pos");
403
404 sqlx::query(
405 "INSERT INTO node_children (parent_id, child_id, position) VALUES (?, ?, ?)",
406 )
407 .bind(parent_id.to_string())
408 .bind(child_id.to_string())
409 .bind(next_pos)
410 .execute(&self.pool)
411 .await
412 .map_err(|e| format!("Failed to add child to parent: {}", e))?;
413
414 Ok(())
415 }
416
417 async fn get_tree_refs(&self, tree_id: &TreeId) -> Result<ResourceRefs, ArborError> {
419 let rows = sqlx::query(
420 "SELECT owner_id, count FROM tree_refs WHERE tree_id = ?",
421 )
422 .bind(tree_id.to_string())
423 .fetch_all(&self.pool)
424 .await
425 .map_err(|e| format!("Failed to fetch tree refs: {}", e))?;
426
427 let mut owners = HashMap::new();
428 let mut total = 0i64;
429
430 for row in rows {
431 let owner_id: String = row.get("owner_id");
432 let count: i64 = row.get("count");
433 owners.insert(owner_id, count);
434 total += count;
435 }
436
437 Ok(ResourceRefs {
438 ref_count: total,
439 owners,
440 })
441 }
442
443 async fn get_node_refs(&self, node_id: &NodeId) -> Result<ResourceRefs, ArborError> {
445 let rows = sqlx::query(
446 "SELECT owner_id, count FROM node_refs WHERE node_id = ?",
447 )
448 .bind(node_id.to_string())
449 .fetch_all(&self.pool)
450 .await
451 .map_err(|e| format!("Failed to fetch node refs: {}", e))?;
452
453 let mut owners = HashMap::new();
454 let mut total = 0i64;
455
456 for row in rows {
457 let owner_id: String = row.get("owner_id");
458 let count: i64 = row.get("count");
459 owners.insert(owner_id, count);
460 total += count;
461 }
462
463 Ok(ResourceRefs {
464 ref_count: total,
465 owners,
466 })
467 }
468
469 pub async fn tree_list(&self, include_scheduled: bool) -> Result<Vec<TreeId>, ArborError> {
471 let query = if include_scheduled {
472 "SELECT id FROM trees WHERE state IN ('active', 'scheduled_delete')"
473 } else {
474 "SELECT id FROM trees WHERE state = 'active'"
475 };
476
477 let rows = sqlx::query(query)
478 .fetch_all(&self.pool)
479 .await
480 .map_err(|e| format!("Failed to list trees: {}", e))?;
481
482 let tree_ids: Result<Vec<TreeId>, ArborError> = rows
483 .iter()
484 .map(|row| {
485 let id_str: String = row.get("id");
486 ArborId::parse_str(&id_str)
487 .map_err(|e| format!("Invalid tree ID: {}", e).into())
488 })
489 .collect();
490
491 tree_ids
492 }
493
494 pub async fn tree_list_scheduled(&self) -> Result<Vec<TreeId>, ArborError> {
496 let rows = sqlx::query(
497 "SELECT id FROM trees WHERE state = 'scheduled_delete'",
498 )
499 .fetch_all(&self.pool)
500 .await
501 .map_err(|e| format!("Failed to list scheduled trees: {}", e))?;
502
503 let tree_ids: Result<Vec<TreeId>, ArborError> = rows
504 .iter()
505 .map(|row| {
506 let id_str: String = row.get("id");
507 ArborId::parse_str(&id_str)
508 .map_err(|e| format!("Invalid tree ID: {}", e).into())
509 })
510 .collect();
511
512 tree_ids
513 }
514
515 pub async fn tree_list_archived(&self) -> Result<Vec<TreeId>, ArborError> {
517 let rows = sqlx::query(
518 "SELECT id FROM trees WHERE state = 'archived'",
519 )
520 .fetch_all(&self.pool)
521 .await
522 .map_err(|e| format!("Failed to list archived trees: {}", e))?;
523
524 let tree_ids: Result<Vec<TreeId>, ArborError> = rows
525 .iter()
526 .map(|row| {
527 let id_str: String = row.get("id");
528 ArborId::parse_str(&id_str)
529 .map_err(|e| format!("Invalid tree ID: {}", e).into())
530 })
531 .collect();
532
533 tree_ids
534 }
535
536 pub async fn tree_update_metadata(
538 &self,
539 tree_id: &TreeId,
540 metadata: Value,
541 ) -> Result<(), ArborError> {
542 let now = current_timestamp();
543 let metadata_json = serde_json::to_string(&metadata)
544 .map_err(|e| format!("Failed to serialize metadata: {}", e))?;
545
546 sqlx::query(
547 "UPDATE trees SET metadata = ?, updated_at = ? WHERE id = ? AND state = 'active'",
548 )
549 .bind(metadata_json)
550 .bind(now)
551 .bind(tree_id.to_string())
552 .execute(&self.pool)
553 .await
554 .map_err(|e| format!("Failed to update tree metadata: {}", e))?;
555
556 Ok(())
557 }
558
559 pub async fn tree_claim(
561 &self,
562 tree_id: &TreeId,
563 owner_id: &str,
564 count: i64,
565 ) -> Result<i64, ArborError> {
566 let now = current_timestamp();
567 let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
568
569 let tree_row = sqlx::query(
571 "SELECT state, scheduled_deletion_at FROM trees WHERE id = ?",
572 )
573 .bind(tree_id.to_string())
574 .fetch_optional(&mut *tx)
575 .await
576 .map_err(|e| format!("Failed to fetch tree: {}", e))?
577 .ok_or_else(|| format!("Tree not found: {}", tree_id))?;
578
579 let state_str: String = tree_row.get("state");
580 let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
581
582 if state == ResourceState::Archived {
583 return Err("Cannot claim archived tree".into());
584 }
585
586 if state == ResourceState::ScheduledDelete {
588 sqlx::query(
589 "UPDATE trees SET state = 'active', scheduled_deletion_at = NULL, updated_at = ? WHERE id = ?",
590 )
591 .bind(now)
592 .bind(tree_id.to_string())
593 .execute(&mut *tx)
594 .await
595 .map_err(|e| format!("Failed to reactivate tree: {}", e))?;
596 }
597
598 sqlx::query(
600 "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at)
601 VALUES (?, ?, ?, ?)
602 ON CONFLICT(tree_id, owner_id) DO UPDATE SET
603 count = count + excluded.count,
604 claimed_at = excluded.claimed_at",
605 )
606 .bind(tree_id.to_string())
607 .bind(owner_id)
608 .bind(count)
609 .bind(now)
610 .execute(&mut *tx)
611 .await
612 .map_err(|e| format!("Failed to claim tree: {}", e))?;
613
614 sqlx::query(
616 "UPDATE trees SET ref_count = ref_count + ?, updated_at = ? WHERE id = ?",
617 )
618 .bind(count)
619 .bind(now)
620 .bind(tree_id.to_string())
621 .execute(&mut *tx)
622 .await
623 .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
624
625 let new_count_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
627 .bind(tree_id.to_string())
628 .fetch_one(&mut *tx)
629 .await
630 .map_err(|e| format!("Failed to fetch new ref_count: {}", e))?;
631
632 let new_count: i64 = new_count_row.get("ref_count");
633
634 tx.commit().await.map_err(|e| e.to_string())?;
635 Ok(new_count)
636 }
637
638 pub async fn tree_release(
640 &self,
641 tree_id: &TreeId,
642 owner_id: &str,
643 count: i64,
644 ) -> Result<i64, ArborError> {
645 let now = current_timestamp();
646 let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
647
648 let owner_ref = sqlx::query(
650 "SELECT count FROM tree_refs WHERE tree_id = ? AND owner_id = ?",
651 )
652 .bind(tree_id.to_string())
653 .bind(owner_id)
654 .fetch_optional(&mut *tx)
655 .await
656 .map_err(|e| format!("Failed to fetch tree ref: {}", e))?
657 .ok_or_else(|| format!("No reference found for owner {}", owner_id))?;
658
659 let current_count: i64 = owner_ref.get("count");
660 if current_count < count {
661 return Err(format!(
662 "Cannot release {} references, owner only has {}",
663 count, current_count
664 )
665 .into());
666 }
667
668 let new_count = current_count - count;
669
670 if new_count == 0 {
672 sqlx::query("DELETE FROM tree_refs WHERE tree_id = ? AND owner_id = ?")
673 .bind(tree_id.to_string())
674 .bind(owner_id)
675 .execute(&mut *tx)
676 .await
677 .map_err(|e| format!("Failed to delete tree ref: {}", e))?;
678 } else {
679 sqlx::query(
680 "UPDATE tree_refs SET count = ?, claimed_at = ? WHERE tree_id = ? AND owner_id = ?",
681 )
682 .bind(new_count)
683 .bind(now)
684 .bind(tree_id.to_string())
685 .bind(owner_id)
686 .execute(&mut *tx)
687 .await
688 .map_err(|e| format!("Failed to update tree ref: {}", e))?;
689 }
690
691 sqlx::query(
693 "UPDATE trees SET ref_count = ref_count - ?, updated_at = ? WHERE id = ?",
694 )
695 .bind(count)
696 .bind(now)
697 .bind(tree_id.to_string())
698 .execute(&mut *tx)
699 .await
700 .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
701
702 let tree_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
704 .bind(tree_id.to_string())
705 .fetch_one(&mut *tx)
706 .await
707 .map_err(|e| format!("Failed to fetch tree: {}", e))?;
708
709 let ref_count: i64 = tree_row.get("ref_count");
710 if ref_count == 0 {
711 sqlx::query(
712 "UPDATE trees SET state = 'scheduled_delete', scheduled_deletion_at = ?, updated_at = ? WHERE id = ?",
713 )
714 .bind(now)
715 .bind(now)
716 .bind(tree_id.to_string())
717 .execute(&mut *tx)
718 .await
719 .map_err(|e| format!("Failed to schedule tree deletion: {}", e))?;
720 }
721
722 tx.commit().await.map_err(|e| e.to_string())?;
723 Ok(ref_count)
724 }
725
726 pub async fn node_create_text(
728 &self,
729 tree_id: &TreeId,
730 parent: Option<NodeId>,
731 content: String,
732 metadata: Option<Value>,
733 ) -> Result<NodeId, ArborError> {
734 let node_id = NodeId::new();
735 let now = current_timestamp();
736
737 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
738
739 sqlx::query(
740 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, metadata, created_at)
741 VALUES (?, ?, ?, 1, 'active', 'text', ?, ?, ?)",
742 )
743 .bind(node_id.to_string())
744 .bind(tree_id.to_string())
745 .bind(parent.map(|p| p.to_string()))
746 .bind(&content)
747 .bind(metadata_json)
748 .bind(now)
749 .execute(&self.pool)
750 .await
751 .map_err(|e| format!("Failed to create text node: {}", e))?;
752
753 if let Some(parent_id) = parent {
755 self.add_child_to_parent(&parent_id, &node_id).await?;
756 }
757
758 Ok(node_id)
759 }
760
761 pub async fn node_create_external(
763 &self,
764 tree_id: &TreeId,
765 parent: Option<NodeId>,
766 handle: Handle,
767 metadata: Option<Value>,
768 ) -> Result<NodeId, ArborError> {
769 let node_id = NodeId::new();
770 let now = current_timestamp();
771
772 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
773 let meta_json = serde_json::to_string(&handle.meta).unwrap();
774
775 sqlx::query(
776 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
777 VALUES (?, ?, ?, 1, 'active', 'external', ?, ?, ?, ?, ?, ?)",
778 )
779 .bind(node_id.to_string())
780 .bind(tree_id.to_string())
781 .bind(parent.map(|p| p.to_string()))
782 .bind(handle.plugin_id.to_string())
783 .bind(&handle.version)
784 .bind(&handle.method)
785 .bind(&meta_json)
786 .bind(metadata_json)
787 .bind(now)
788 .execute(&self.pool)
789 .await
790 .map_err(|e| format!("Failed to create external node: {}", e))?;
791
792 if let Some(parent_id) = parent {
794 self.add_child_to_parent(&parent_id, &node_id).await?;
795 }
796
797 Ok(node_id)
798 }
799
800 pub async fn node_create_external_ephemeral(
802 &self,
803 tree_id: &TreeId,
804 parent: Option<NodeId>,
805 handle: Handle,
806 metadata: Option<Value>,
807 ) -> Result<NodeId, ArborError> {
808 let node_id = NodeId::new();
809 let now = current_timestamp();
810
811 let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
812 let meta_json = serde_json::to_string(&handle.meta).unwrap();
813
814 sqlx::query(
815 "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
816 VALUES (?, ?, ?, 0, 'scheduled_delete', ?, 'external', ?, ?, ?, ?, ?, ?)",
817 )
818 .bind(node_id.to_string())
819 .bind(tree_id.to_string())
820 .bind(parent.map(|p| p.to_string()))
821 .bind(now) .bind(handle.plugin_id.to_string())
823 .bind(&handle.version)
824 .bind(&handle.method)
825 .bind(&meta_json)
826 .bind(metadata_json)
827 .bind(now)
828 .execute(&self.pool)
829 .await
830 .map_err(|e| format!("Failed to create ephemeral external node: {}", e))?;
831
832 if let Some(parent_id) = parent {
834 self.add_child_to_parent(&parent_id, &node_id).await?;
835 }
836
837 Ok(node_id)
838 }
839
840 pub async fn node_get(
842 &self,
843 tree_id: &TreeId,
844 node_id: &NodeId,
845 ) -> Result<Node, ArborError> {
846 let nodes = self.get_nodes_for_tree(tree_id).await?;
847 nodes
848 .get(node_id)
849 .cloned()
850 .ok_or_else(|| format!("Node not found: {}", node_id).into())
851 }
852
853 pub async fn node_get_children(
855 &self,
856 tree_id: &TreeId,
857 node_id: &NodeId,
858 ) -> Result<Vec<NodeId>, ArborError> {
859 let rows = sqlx::query(
860 "SELECT id FROM nodes WHERE tree_id = ? AND parent_id = ? AND state = 'active'",
861 )
862 .bind(tree_id.to_string())
863 .bind(node_id.to_string())
864 .fetch_all(&self.pool)
865 .await
866 .map_err(|e| format!("Failed to fetch children: {}", e))?;
867
868 let children: Result<Vec<NodeId>, ArborError> = rows
869 .iter()
870 .map(|row| {
871 let id_str: String = row.get("id");
872 ArborId::parse_str(&id_str)
873 .map_err(|e| format!("Invalid node ID: {}", e).into())
874 })
875 .collect();
876
877 children
878 }
879
880 pub async fn node_get_parent(
882 &self,
883 tree_id: &TreeId,
884 node_id: &NodeId,
885 ) -> Result<Option<NodeId>, ArborError> {
886 let row = sqlx::query(
887 "SELECT parent_id FROM nodes WHERE tree_id = ? AND id = ?",
888 )
889 .bind(tree_id.to_string())
890 .bind(node_id.to_string())
891 .fetch_optional(&self.pool)
892 .await
893 .map_err(|e| format!("Failed to fetch parent: {}", e))?
894 .ok_or_else(|| format!("Node not found: {}", node_id))?;
895
896 let parent_id: Option<String> = row.get("parent_id");
897 match parent_id {
898 Some(id_str) => Ok(Some(
899 ArborId::parse_str(&id_str)
900 .map_err(|e| format!("Invalid parent ID: {}", e))?,
901 )),
902 None => Ok(None),
903 }
904 }
905
906 pub async fn node_get_path(
908 &self,
909 tree_id: &TreeId,
910 node_id: &NodeId,
911 ) -> Result<Vec<NodeId>, ArborError> {
912 let mut path = Vec::new();
913 let mut current_id = Some(*node_id);
914
915 while let Some(id) = current_id {
917 path.push(id);
918 current_id = self.node_get_parent(tree_id, &id).await?;
919 }
920
921 path.reverse();
923 Ok(path)
924 }
925
926 pub async fn context_list_leaves(
928 &self,
929 tree_id: &TreeId,
930 ) -> Result<Vec<NodeId>, ArborError> {
931 let rows = sqlx::query(
932 "SELECT id FROM nodes
933 WHERE tree_id = ? AND state = 'active'
934 AND id NOT IN (SELECT DISTINCT parent_id FROM nodes WHERE parent_id IS NOT NULL AND tree_id = ?)",
935 )
936 .bind(tree_id.to_string())
937 .bind(tree_id.to_string())
938 .fetch_all(&self.pool)
939 .await
940 .map_err(|e| format!("Failed to fetch leaf nodes: {}", e))?;
941
942 let leaves: Result<Vec<NodeId>, ArborError> = rows
943 .iter()
944 .map(|row| {
945 let id_str: String = row.get("id");
946 ArborId::parse_str(&id_str)
947 .map_err(|e| format!("Invalid node ID: {}", e).into())
948 })
949 .collect();
950
951 leaves
952 }
953
954 pub async fn context_get_path(
956 &self,
957 tree_id: &TreeId,
958 node_id: &NodeId,
959 ) -> Result<Vec<Node>, ArborError> {
960 let path_ids = self.node_get_path(tree_id, node_id).await?;
961 let nodes = self.get_nodes_for_tree(tree_id).await?;
962
963 let path_nodes: Result<Vec<Node>, ArborError> = path_ids
964 .iter()
965 .map(|id| {
966 nodes
967 .get(id)
968 .cloned()
969 .ok_or_else(|| format!("Node not found in path: {}", id).into())
970 })
971 .collect();
972
973 path_nodes
974 }
975
976 pub async fn context_get_handles(
978 &self,
979 tree_id: &TreeId,
980 node_id: &NodeId,
981 ) -> Result<Vec<Handle>, ArborError> {
982 let path_nodes = self.context_get_path(tree_id, node_id).await?;
983
984 let handles: Vec<Handle> = path_nodes
985 .iter()
986 .filter_map(|node| match &node.data {
987 NodeType::External { handle } => Some(handle.clone()),
988 NodeType::Text { .. } => None,
989 })
990 .collect();
991
992 Ok(handles)
993 }
994
995 pub async fn cleanup_scheduled_trees(&self) -> Result<usize, ArborError> {
997 let now = current_timestamp();
998 let seven_days_ago = now - (7 * 24 * 60 * 60);
999
1000 let result = sqlx::query(
1001 "UPDATE trees
1002 SET state = 'archived', archived_at = ?, updated_at = ?
1003 WHERE state = 'scheduled_delete' AND scheduled_deletion_at < ?",
1004 )
1005 .bind(now)
1006 .bind(now)
1007 .bind(seven_days_ago)
1008 .execute(&self.pool)
1009 .await
1010 .map_err(|e| format!("Failed to archive trees: {}", e))?;
1011
1012 Ok(result.rows_affected() as usize)
1013 }
1014}
1015
1016fn current_timestamp() -> i64 {
1018 SystemTime::now()
1019 .duration_since(UNIX_EPOCH)
1020 .unwrap()
1021 .as_secs() as i64
1022}