Skip to main content

plexus_substrate/activations/arbor/
storage.rs

1use super::types::{
2    ArborError, ArborId, Node, NodeId, NodeType, ResourceRefs, ResourceState, Tree, TreeId, Handle,
3};
4use serde_json::Value;
5use sqlx::{sqlite::{SqliteConnectOptions, SqlitePool}, ConnectOptions, Row};
6use uuid::Uuid;
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11/// Configuration for Arbor storage
12#[derive(Debug, Clone)]
13pub struct ArborConfig {
14    /// Duration before scheduled resources move to archived (seconds)
15    pub scheduled_deletion_window: i64, // Default: 7 days = 604800
16
17    /// Duration before archived resources are purged (seconds)
18    pub archive_window: i64, // Default: 30 days = 2592000
19
20    /// Path to SQLite database
21    pub db_path: PathBuf,
22
23    /// Enable auto-cleanup background task
24    pub auto_cleanup: bool,
25
26    /// Cleanup task interval (seconds)
27    pub cleanup_interval: i64, // Default: 1 hour = 3600
28}
29
30impl Default for ArborConfig {
31    fn default() -> Self {
32        Self {
33            scheduled_deletion_window: 604800,  // 7 days
34            archive_window: 2592000,            // 30 days
35            db_path: PathBuf::from("arbor.db"),
36            auto_cleanup: true,
37            cleanup_interval: 3600,             // 1 hour
38        }
39    }
40}
41
42/// SQLite-based storage for Arbor tree structures.
43///
44/// # Usage Pattern: Direct Injection
45///
46/// ArborStorage is **infrastructure** - activations should receive it directly
47/// at construction time, NOT via Plexus routing.
48///
49/// ```ignore
50/// // Correct: Direct injection
51/// let cone = Cone::new(cone_config, arbor_storage.clone()).await?;
52///
53/// // Then use directly for tree operations
54/// let tree = arbor_storage.tree_get(&tree_id).await?;
55/// let node_id = arbor_storage.node_create_external(&tree_id, parent, handle, None).await?;
56/// ```
57///
58/// **Do NOT** route tree operations through Plexus - that adds unnecessary
59/// serialization overhead for what should be direct method calls.
60///
61/// The **only** case where Plexus is needed for Arbor-related data is
62/// cross-plugin handle resolution: when you have a Handle pointing to
63/// external data (e.g., a Cone message) and need to resolve its content.
64///
65/// See: `docs/architecture/*_arbor-usage-pattern.md`
66pub struct ArborStorage {
67    pool: SqlitePool,
68    #[allow(dead_code)]
69    config: ArborConfig,
70}
71
72impl ArborStorage {
73    /// Create a new storage instance and run migrations
74    pub async fn new(config: ArborConfig) -> Result<Self, ArborError> {
75        let db_url = format!("sqlite:{}?mode=rwc", config.db_path.display());
76        let connect_options: SqliteConnectOptions = db_url.parse()
77            .map_err(|e| format!("Failed to parse database URL: {}", e))?;
78        let connect_options = connect_options.disable_statement_logging();
79        let pool = SqlitePool::connect_with(connect_options.clone())
80            .await
81            .map_err(|e| format!("Failed to connect to database: {}", e))?;
82
83        let storage = Self { pool, config };
84        storage.run_migrations().await?;
85
86        Ok(storage)
87    }
88
89    /// Run database migrations
90    async fn run_migrations(&self) -> Result<(), ArborError> {
91        sqlx::query(
92            r#"
93            CREATE TABLE IF NOT EXISTS trees (
94                id TEXT PRIMARY KEY,
95                root_node_id TEXT NOT NULL,
96                ref_count INTEGER NOT NULL DEFAULT 1,
97                state TEXT NOT NULL DEFAULT 'active',
98                scheduled_deletion_at INTEGER,
99                archived_at INTEGER,
100                created_at INTEGER NOT NULL,
101                updated_at INTEGER NOT NULL,
102                metadata TEXT
103            );
104
105            CREATE TABLE IF NOT EXISTS tree_refs (
106                tree_id TEXT NOT NULL,
107                owner_id TEXT NOT NULL,
108                count INTEGER NOT NULL DEFAULT 1,
109                claimed_at INTEGER NOT NULL,
110                PRIMARY KEY (tree_id, owner_id),
111                FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
112            );
113
114            CREATE TABLE IF NOT EXISTS nodes (
115                id TEXT PRIMARY KEY,
116                tree_id TEXT NOT NULL,
117                parent_id TEXT,
118                ref_count INTEGER NOT NULL DEFAULT 1,
119                state TEXT NOT NULL DEFAULT 'active',
120                scheduled_deletion_at INTEGER,
121                archived_at INTEGER,
122                node_type TEXT NOT NULL,
123                content TEXT,
124                handle_plugin_id TEXT,
125                handle_version TEXT,
126                handle_method TEXT,
127                handle_meta TEXT,
128                created_at INTEGER NOT NULL,
129                metadata TEXT,
130                FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
131            );
132
133            CREATE TABLE IF NOT EXISTS node_refs (
134                node_id TEXT NOT NULL,
135                owner_id TEXT NOT NULL,
136                count INTEGER NOT NULL DEFAULT 1,
137                claimed_at INTEGER NOT NULL,
138                PRIMARY KEY (node_id, owner_id),
139                FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
140            );
141
142            CREATE TABLE IF NOT EXISTS node_children (
143                parent_id TEXT NOT NULL,
144                child_id TEXT NOT NULL,
145                position INTEGER NOT NULL,
146                PRIMARY KEY (parent_id, child_id),
147                FOREIGN KEY (parent_id) REFERENCES nodes(id) ON DELETE CASCADE,
148                FOREIGN KEY (child_id) REFERENCES nodes(id) ON DELETE CASCADE
149            );
150
151            CREATE INDEX IF NOT EXISTS idx_trees_state ON trees(state);
152            CREATE INDEX IF NOT EXISTS idx_trees_scheduled ON trees(scheduled_deletion_at) WHERE state = 'scheduled_delete';
153            CREATE INDEX IF NOT EXISTS idx_trees_archived ON trees(archived_at) WHERE state = 'archived';
154            CREATE INDEX IF NOT EXISTS idx_nodes_tree ON nodes(tree_id);
155            CREATE INDEX IF NOT EXISTS idx_nodes_parent ON nodes(parent_id);
156            CREATE INDEX IF NOT EXISTS idx_nodes_state ON nodes(state);
157            CREATE INDEX IF NOT EXISTS idx_nodes_scheduled ON nodes(scheduled_deletion_at) WHERE state = 'scheduled_delete';
158            CREATE INDEX IF NOT EXISTS idx_node_children_parent ON node_children(parent_id);
159            CREATE INDEX IF NOT EXISTS idx_node_children_child ON node_children(child_id);
160            "#,
161        )
162        .execute(&self.pool)
163        .await
164        .map_err(|e| format!("Failed to run migrations: {}", e))?;
165
166        Ok(())
167    }
168
169    // ========================================================================
170    // Tree Operations
171    // ========================================================================
172
173    /// Create a new tree with a root node
174    pub async fn tree_create(
175        &self,
176        metadata: Option<serde_json::Value>,
177        owner_id: &str,
178    ) -> Result<TreeId, ArborError> {
179        let tree_id = TreeId::new();
180        let root_id = NodeId::new();
181        let now = current_timestamp();
182
183        let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
184
185        // Create tree
186        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
187        sqlx::query(
188            "INSERT INTO trees (id, root_node_id, ref_count, state, created_at, updated_at, metadata)
189             VALUES (?, ?, 1, 'active', ?, ?, ?)",
190        )
191        .bind(tree_id.to_string())
192        .bind(root_id.to_string())
193        .bind(now)
194        .bind(now)
195        .bind(metadata_json)
196        .execute(&mut *tx)
197        .await
198        .map_err(|e| format!("Failed to create tree: {}", e))?;
199
200        // Create tree ref for owner
201        sqlx::query(
202            "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at) VALUES (?, ?, 1, ?)",
203        )
204        .bind(tree_id.to_string())
205        .bind(owner_id)
206        .bind(now)
207        .execute(&mut *tx)
208        .await
209        .map_err(|e| format!("Failed to create tree ref: {}", e))?;
210
211        // Create root node (empty text node)
212        sqlx::query(
213            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, created_at)
214             VALUES (?, ?, NULL, 1, 'active', 'text', '', ?)",
215        )
216        .bind(root_id.to_string())
217        .bind(tree_id.to_string())
218        .bind(now)
219        .execute(&mut *tx)
220        .await
221        .map_err(|e| format!("Failed to create root node: {}", e))?;
222
223        tx.commit().await.map_err(|e| e.to_string())?;
224
225        Ok(tree_id)
226    }
227
228    /// Get a tree by ID (only active trees)
229    pub async fn tree_get(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
230        self.tree_get_internal(tree_id, false).await
231    }
232
233    /// Get an archived tree by ID
234    pub async fn tree_get_archived(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
235        self.tree_get_internal(tree_id, true).await
236    }
237
238    /// Internal tree getter
239    async fn tree_get_internal(
240        &self,
241        tree_id: &TreeId,
242        allow_archived: bool,
243    ) -> Result<Tree, ArborError> {
244        // Get tree metadata
245        let tree_row = sqlx::query(
246            "SELECT id, root_node_id, ref_count, state, scheduled_deletion_at, archived_at, created_at, updated_at, metadata
247             FROM trees WHERE id = ?",
248        )
249        .bind(tree_id.to_string())
250        .fetch_optional(&self.pool)
251        .await
252        .map_err(|e| format!("Failed to fetch tree: {}", e))?
253        .ok_or_else(|| format!("Tree not found: {}", tree_id))?;
254
255        let state_str: String = tree_row.get("state");
256        let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
257
258        if !allow_archived && state == ResourceState::Archived {
259            return Err("Tree is archived, use tree_get_archived()".into());
260        }
261
262        let root_node_id: String = tree_row.get("root_node_id");
263        let root_node_id = ArborId::parse_str(&root_node_id)
264            .map_err(|e| format!("Invalid root node ID: {}", e))?;
265
266        // Get all nodes for this tree
267        let nodes = self.get_nodes_for_tree(tree_id).await?;
268
269        // Get reference information
270        let refs = self.get_tree_refs(tree_id).await?;
271
272        let metadata_json: Option<String> = tree_row.get("metadata");
273        let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
274
275        Ok(Tree {
276            id: *tree_id,
277            root: root_node_id,
278            nodes,
279            state: Some(state),
280            refs: Some(refs),
281            scheduled_deletion_at: tree_row.get("scheduled_deletion_at"),
282            archived_at: tree_row.get("archived_at"),
283            created_at: tree_row.get("created_at"),
284            updated_at: tree_row.get("updated_at"),
285            metadata,
286        })
287    }
288
289    /// Get all nodes for a tree
290    async fn get_nodes_for_tree(&self, tree_id: &TreeId) -> Result<HashMap<NodeId, Node>, ArborError> {
291        let rows = sqlx::query(
292            "SELECT id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, archived_at,
293                    node_type, content, handle_plugin_id, handle_version, handle_method,
294                    handle_meta, created_at, metadata
295             FROM nodes WHERE tree_id = ?",
296        )
297        .bind(tree_id.to_string())
298        .fetch_all(&self.pool)
299        .await
300        .map_err(|e| format!("Failed to fetch nodes: {}", e))?;
301
302        let mut nodes = HashMap::new();
303
304        for row in rows {
305            let node_id_str: String = row.get("id");
306            let node_id = ArborId::parse_str(&node_id_str)
307                .map_err(|e| format!("Invalid node ID: {}", e))?;
308
309            let parent_id_str: Option<String> = row.get("parent_id");
310            let parent_id = parent_id_str
311                .map(|s| ArborId::parse_str(&s).map_err(|e| format!("Invalid parent ID: {}", e)))
312                .transpose()?;
313
314            // Get children for this node
315            let children = self.get_node_children_internal(&node_id).await?;
316
317            let node_type_str: String = row.get("node_type");
318            let data = match node_type_str.as_str() {
319                "text" => {
320                    let content: String = row.get("content");
321                    NodeType::Text { content }
322                }
323                "external" => {
324                    let plugin_id_str: String = row.get("handle_plugin_id");
325                    let plugin_id = Uuid::parse_str(&plugin_id_str)
326                        .map_err(|e| format!("Invalid handle plugin_id: {}", e))?;
327                    let version: String = row.get("handle_version");
328                    let method: String = row.get("handle_method");
329                    let meta_json: Option<String> = row.get("handle_meta");
330                    let meta: Vec<String> = meta_json
331                        .and_then(|s| serde_json::from_str(&s).ok())
332                        .unwrap_or_default();
333
334                    NodeType::External {
335                        handle: Handle::new(plugin_id, version, method)
336                            .with_meta(meta),
337                    }
338                }
339                _ => return Err(format!("Unknown node type: {}", node_type_str).into()),
340            };
341
342            let state_str: String = row.get("state");
343            let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
344
345            let refs = self.get_node_refs(&node_id).await?;
346
347            let metadata_json: Option<String> = row.get("metadata");
348            let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
349
350            let node = Node {
351                id: node_id,
352                parent: parent_id,
353                children,
354                data,
355                state: Some(state),
356                refs: Some(refs),
357                scheduled_deletion_at: row.get("scheduled_deletion_at"),
358                archived_at: row.get("archived_at"),
359                created_at: row.get("created_at"),
360                metadata,
361            };
362
363            nodes.insert(node_id, node);
364        }
365
366        Ok(nodes)
367    }
368
369    /// Get children for a node (ordered by position)
370    async fn get_node_children_internal(&self, node_id: &NodeId) -> Result<Vec<NodeId>, ArborError> {
371        let rows = sqlx::query(
372            "SELECT child_id FROM node_children WHERE parent_id = ? ORDER BY position",
373        )
374        .bind(node_id.to_string())
375        .fetch_all(&self.pool)
376        .await
377        .map_err(|e| format!("Failed to fetch node children: {}", e))?;
378
379        let children: Result<Vec<NodeId>, ArborError> = rows
380            .iter()
381            .map(|row| {
382                let child_id_str: String = row.get("child_id");
383                ArborId::parse_str(&child_id_str)
384                    .map_err(|e| e.into())
385            })
386            .collect();
387
388        children
389    }
390
391    /// Add a child to a parent in the node_children table
392    async fn add_child_to_parent(&self, parent_id: &NodeId, child_id: &NodeId) -> Result<(), ArborError> {
393        // Get next position for this parent
394        let row = sqlx::query(
395            "SELECT COALESCE(MAX(position), -1) + 1 as next_pos FROM node_children WHERE parent_id = ?",
396        )
397        .bind(parent_id.to_string())
398        .fetch_one(&self.pool)
399        .await
400        .map_err(|e| format!("Failed to get next position: {}", e))?;
401
402        let next_pos: i64 = row.get("next_pos");
403
404        sqlx::query(
405            "INSERT INTO node_children (parent_id, child_id, position) VALUES (?, ?, ?)",
406        )
407        .bind(parent_id.to_string())
408        .bind(child_id.to_string())
409        .bind(next_pos)
410        .execute(&self.pool)
411        .await
412        .map_err(|e| format!("Failed to add child to parent: {}", e))?;
413
414        Ok(())
415    }
416
417    /// Get reference information for a tree
418    async fn get_tree_refs(&self, tree_id: &TreeId) -> Result<ResourceRefs, ArborError> {
419        let rows = sqlx::query(
420            "SELECT owner_id, count FROM tree_refs WHERE tree_id = ?",
421        )
422        .bind(tree_id.to_string())
423        .fetch_all(&self.pool)
424        .await
425        .map_err(|e| format!("Failed to fetch tree refs: {}", e))?;
426
427        let mut owners = HashMap::new();
428        let mut total = 0i64;
429
430        for row in rows {
431            let owner_id: String = row.get("owner_id");
432            let count: i64 = row.get("count");
433            owners.insert(owner_id, count);
434            total += count;
435        }
436
437        Ok(ResourceRefs {
438            ref_count: total,
439            owners,
440        })
441    }
442
443    /// Get reference information for a node
444    async fn get_node_refs(&self, node_id: &NodeId) -> Result<ResourceRefs, ArborError> {
445        let rows = sqlx::query(
446            "SELECT owner_id, count FROM node_refs WHERE node_id = ?",
447        )
448        .bind(node_id.to_string())
449        .fetch_all(&self.pool)
450        .await
451        .map_err(|e| format!("Failed to fetch node refs: {}", e))?;
452
453        let mut owners = HashMap::new();
454        let mut total = 0i64;
455
456        for row in rows {
457            let owner_id: String = row.get("owner_id");
458            let count: i64 = row.get("count");
459            owners.insert(owner_id, count);
460            total += count;
461        }
462
463        Ok(ResourceRefs {
464            ref_count: total,
465            owners,
466        })
467    }
468
469    /// List all tree IDs (active only by default)
470    pub async fn tree_list(&self, include_scheduled: bool) -> Result<Vec<TreeId>, ArborError> {
471        let query = if include_scheduled {
472            "SELECT id FROM trees WHERE state IN ('active', 'scheduled_delete')"
473        } else {
474            "SELECT id FROM trees WHERE state = 'active'"
475        };
476
477        let rows = sqlx::query(query)
478            .fetch_all(&self.pool)
479            .await
480            .map_err(|e| format!("Failed to list trees: {}", e))?;
481
482        let tree_ids: Result<Vec<TreeId>, ArborError> = rows
483            .iter()
484            .map(|row| {
485                let id_str: String = row.get("id");
486                ArborId::parse_str(&id_str)
487                    .map_err(|e| format!("Invalid tree ID: {}", e).into())
488            })
489            .collect();
490
491        tree_ids
492    }
493
494    /// List trees scheduled for deletion
495    pub async fn tree_list_scheduled(&self) -> Result<Vec<TreeId>, ArborError> {
496        let rows = sqlx::query(
497            "SELECT id FROM trees WHERE state = 'scheduled_delete'",
498        )
499        .fetch_all(&self.pool)
500        .await
501        .map_err(|e| format!("Failed to list scheduled trees: {}", e))?;
502
503        let tree_ids: Result<Vec<TreeId>, ArborError> = rows
504            .iter()
505            .map(|row| {
506                let id_str: String = row.get("id");
507                ArborId::parse_str(&id_str)
508                    .map_err(|e| format!("Invalid tree ID: {}", e).into())
509            })
510            .collect();
511
512        tree_ids
513    }
514
515    /// List archived trees
516    pub async fn tree_list_archived(&self) -> Result<Vec<TreeId>, ArborError> {
517        let rows = sqlx::query(
518            "SELECT id FROM trees WHERE state = 'archived'",
519        )
520        .fetch_all(&self.pool)
521        .await
522        .map_err(|e| format!("Failed to list archived trees: {}", e))?;
523
524        let tree_ids: Result<Vec<TreeId>, ArborError> = rows
525            .iter()
526            .map(|row| {
527                let id_str: String = row.get("id");
528                ArborId::parse_str(&id_str)
529                    .map_err(|e| format!("Invalid tree ID: {}", e).into())
530            })
531            .collect();
532
533        tree_ids
534    }
535
536    /// Update tree metadata
537    pub async fn tree_update_metadata(
538        &self,
539        tree_id: &TreeId,
540        metadata: Value,
541    ) -> Result<(), ArborError> {
542        let now = current_timestamp();
543        let metadata_json = serde_json::to_string(&metadata)
544            .map_err(|e| format!("Failed to serialize metadata: {}", e))?;
545
546        sqlx::query(
547            "UPDATE trees SET metadata = ?, updated_at = ? WHERE id = ? AND state = 'active'",
548        )
549        .bind(metadata_json)
550        .bind(now)
551        .bind(tree_id.to_string())
552        .execute(&self.pool)
553        .await
554        .map_err(|e| format!("Failed to update tree metadata: {}", e))?;
555
556        Ok(())
557    }
558
559    /// Claim ownership of a tree (increment reference count)
560    pub async fn tree_claim(
561        &self,
562        tree_id: &TreeId,
563        owner_id: &str,
564        count: i64,
565    ) -> Result<i64, ArborError> {
566        let now = current_timestamp();
567        let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
568
569        // Check if tree exists and is claimable (active or scheduled_delete)
570        let tree_row = sqlx::query(
571            "SELECT state, scheduled_deletion_at FROM trees WHERE id = ?",
572        )
573        .bind(tree_id.to_string())
574        .fetch_optional(&mut *tx)
575        .await
576        .map_err(|e| format!("Failed to fetch tree: {}", e))?
577        .ok_or_else(|| format!("Tree not found: {}", tree_id))?;
578
579        let state_str: String = tree_row.get("state");
580        let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
581
582        if state == ResourceState::Archived {
583            return Err("Cannot claim archived tree".into());
584        }
585
586        // If scheduled for deletion, reactivate it
587        if state == ResourceState::ScheduledDelete {
588            sqlx::query(
589                "UPDATE trees SET state = 'active', scheduled_deletion_at = NULL, updated_at = ? WHERE id = ?",
590            )
591            .bind(now)
592            .bind(tree_id.to_string())
593            .execute(&mut *tx)
594            .await
595            .map_err(|e| format!("Failed to reactivate tree: {}", e))?;
596        }
597
598        // Update or insert tree_ref
599        sqlx::query(
600            "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at)
601             VALUES (?, ?, ?, ?)
602             ON CONFLICT(tree_id, owner_id) DO UPDATE SET
603                count = count + excluded.count,
604                claimed_at = excluded.claimed_at",
605        )
606        .bind(tree_id.to_string())
607        .bind(owner_id)
608        .bind(count)
609        .bind(now)
610        .execute(&mut *tx)
611        .await
612        .map_err(|e| format!("Failed to claim tree: {}", e))?;
613
614        // Update tree ref_count
615        sqlx::query(
616            "UPDATE trees SET ref_count = ref_count + ?, updated_at = ? WHERE id = ?",
617        )
618        .bind(count)
619        .bind(now)
620        .bind(tree_id.to_string())
621        .execute(&mut *tx)
622        .await
623        .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
624
625        // Get the new ref_count
626        let new_count_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
627            .bind(tree_id.to_string())
628            .fetch_one(&mut *tx)
629            .await
630            .map_err(|e| format!("Failed to fetch new ref_count: {}", e))?;
631
632        let new_count: i64 = new_count_row.get("ref_count");
633
634        tx.commit().await.map_err(|e| e.to_string())?;
635        Ok(new_count)
636    }
637
638    /// Release ownership of a tree (decrement reference count)
639    pub async fn tree_release(
640        &self,
641        tree_id: &TreeId,
642        owner_id: &str,
643        count: i64,
644    ) -> Result<i64, ArborError> {
645        let now = current_timestamp();
646        let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
647
648        // Check current ref count for this owner
649        let owner_ref = sqlx::query(
650            "SELECT count FROM tree_refs WHERE tree_id = ? AND owner_id = ?",
651        )
652        .bind(tree_id.to_string())
653        .bind(owner_id)
654        .fetch_optional(&mut *tx)
655        .await
656        .map_err(|e| format!("Failed to fetch tree ref: {}", e))?
657        .ok_or_else(|| format!("No reference found for owner {}", owner_id))?;
658
659        let current_count: i64 = owner_ref.get("count");
660        if current_count < count {
661            return Err(format!(
662                "Cannot release {} references, owner only has {}",
663                count, current_count
664            )
665            .into());
666        }
667
668        let new_count = current_count - count;
669
670        // Update or delete tree_ref
671        if new_count == 0 {
672            sqlx::query("DELETE FROM tree_refs WHERE tree_id = ? AND owner_id = ?")
673                .bind(tree_id.to_string())
674                .bind(owner_id)
675                .execute(&mut *tx)
676                .await
677                .map_err(|e| format!("Failed to delete tree ref: {}", e))?;
678        } else {
679            sqlx::query(
680                "UPDATE tree_refs SET count = ?, claimed_at = ? WHERE tree_id = ? AND owner_id = ?",
681            )
682            .bind(new_count)
683            .bind(now)
684            .bind(tree_id.to_string())
685            .bind(owner_id)
686            .execute(&mut *tx)
687            .await
688            .map_err(|e| format!("Failed to update tree ref: {}", e))?;
689        }
690
691        // Update tree ref_count
692        sqlx::query(
693            "UPDATE trees SET ref_count = ref_count - ?, updated_at = ? WHERE id = ?",
694        )
695        .bind(count)
696        .bind(now)
697        .bind(tree_id.to_string())
698        .execute(&mut *tx)
699        .await
700        .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
701
702        // Check if ref_count reached 0, schedule for deletion
703        let tree_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
704            .bind(tree_id.to_string())
705            .fetch_one(&mut *tx)
706            .await
707            .map_err(|e| format!("Failed to fetch tree: {}", e))?;
708
709        let ref_count: i64 = tree_row.get("ref_count");
710        if ref_count == 0 {
711            sqlx::query(
712                "UPDATE trees SET state = 'scheduled_delete', scheduled_deletion_at = ?, updated_at = ? WHERE id = ?",
713            )
714            .bind(now)
715            .bind(now)
716            .bind(tree_id.to_string())
717            .execute(&mut *tx)
718            .await
719            .map_err(|e| format!("Failed to schedule tree deletion: {}", e))?;
720        }
721
722        tx.commit().await.map_err(|e| e.to_string())?;
723        Ok(ref_count)
724    }
725
726    /// Create a text node in a tree
727    pub async fn node_create_text(
728        &self,
729        tree_id: &TreeId,
730        parent: Option<NodeId>,
731        content: String,
732        metadata: Option<Value>,
733    ) -> Result<NodeId, ArborError> {
734        let node_id = NodeId::new();
735        let now = current_timestamp();
736
737        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
738
739        sqlx::query(
740            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, metadata, created_at)
741             VALUES (?, ?, ?, 1, 'active', 'text', ?, ?, ?)",
742        )
743        .bind(node_id.to_string())
744        .bind(tree_id.to_string())
745        .bind(parent.map(|p| p.to_string()))
746        .bind(&content)
747        .bind(metadata_json)
748        .bind(now)
749        .execute(&self.pool)
750        .await
751        .map_err(|e| format!("Failed to create text node: {}", e))?;
752
753        // Add to node_children table if parent is specified
754        if let Some(parent_id) = parent {
755            self.add_child_to_parent(&parent_id, &node_id).await?;
756        }
757
758        Ok(node_id)
759    }
760
761    /// Create an external node in a tree
762    pub async fn node_create_external(
763        &self,
764        tree_id: &TreeId,
765        parent: Option<NodeId>,
766        handle: Handle,
767        metadata: Option<Value>,
768    ) -> Result<NodeId, ArborError> {
769        let node_id = NodeId::new();
770        let now = current_timestamp();
771
772        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
773        let meta_json = serde_json::to_string(&handle.meta).unwrap();
774
775        sqlx::query(
776            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
777             VALUES (?, ?, ?, 1, 'active', 'external', ?, ?, ?, ?, ?, ?)",
778        )
779        .bind(node_id.to_string())
780        .bind(tree_id.to_string())
781        .bind(parent.map(|p| p.to_string()))
782        .bind(handle.plugin_id.to_string())
783        .bind(&handle.version)
784        .bind(&handle.method)
785        .bind(&meta_json)
786        .bind(metadata_json)
787        .bind(now)
788        .execute(&self.pool)
789        .await
790        .map_err(|e| format!("Failed to create external node: {}", e))?;
791
792        // Add to node_children table if parent is specified
793        if let Some(parent_id) = parent {
794            self.add_child_to_parent(&parent_id, &node_id).await?;
795        }
796
797        Ok(node_id)
798    }
799
800    /// Create an external node that is already scheduled for deletion (ephemeral)
801    pub async fn node_create_external_ephemeral(
802        &self,
803        tree_id: &TreeId,
804        parent: Option<NodeId>,
805        handle: Handle,
806        metadata: Option<Value>,
807    ) -> Result<NodeId, ArborError> {
808        let node_id = NodeId::new();
809        let now = current_timestamp();
810
811        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
812        let meta_json = serde_json::to_string(&handle.meta).unwrap();
813
814        sqlx::query(
815            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
816             VALUES (?, ?, ?, 0, 'scheduled_delete', ?, 'external', ?, ?, ?, ?, ?, ?)",
817        )
818        .bind(node_id.to_string())
819        .bind(tree_id.to_string())
820        .bind(parent.map(|p| p.to_string()))
821        .bind(now) // scheduled_deletion_at = now (will be cleaned up by cleanup_scheduled_trees)
822        .bind(handle.plugin_id.to_string())
823        .bind(&handle.version)
824        .bind(&handle.method)
825        .bind(&meta_json)
826        .bind(metadata_json)
827        .bind(now)
828        .execute(&self.pool)
829        .await
830        .map_err(|e| format!("Failed to create ephemeral external node: {}", e))?;
831
832        // Add to node_children table if parent is specified
833        if let Some(parent_id) = parent {
834            self.add_child_to_parent(&parent_id, &node_id).await?;
835        }
836
837        Ok(node_id)
838    }
839
840    /// Get a node by ID
841    pub async fn node_get(
842        &self,
843        tree_id: &TreeId,
844        node_id: &NodeId,
845    ) -> Result<Node, ArborError> {
846        let nodes = self.get_nodes_for_tree(tree_id).await?;
847        nodes
848            .get(node_id)
849            .cloned()
850            .ok_or_else(|| format!("Node not found: {}", node_id).into())
851    }
852
853    /// Get children of a node
854    pub async fn node_get_children(
855        &self,
856        tree_id: &TreeId,
857        node_id: &NodeId,
858    ) -> Result<Vec<NodeId>, ArborError> {
859        let rows = sqlx::query(
860            "SELECT id FROM nodes WHERE tree_id = ? AND parent_id = ? AND state = 'active'",
861        )
862        .bind(tree_id.to_string())
863        .bind(node_id.to_string())
864        .fetch_all(&self.pool)
865        .await
866        .map_err(|e| format!("Failed to fetch children: {}", e))?;
867
868        let children: Result<Vec<NodeId>, ArborError> = rows
869            .iter()
870            .map(|row| {
871                let id_str: String = row.get("id");
872                ArborId::parse_str(&id_str)
873                    .map_err(|e| format!("Invalid node ID: {}", e).into())
874            })
875            .collect();
876
877        children
878    }
879
880    /// Get parent of a node
881    pub async fn node_get_parent(
882        &self,
883        tree_id: &TreeId,
884        node_id: &NodeId,
885    ) -> Result<Option<NodeId>, ArborError> {
886        let row = sqlx::query(
887            "SELECT parent_id FROM nodes WHERE tree_id = ? AND id = ?",
888        )
889        .bind(tree_id.to_string())
890        .bind(node_id.to_string())
891        .fetch_optional(&self.pool)
892        .await
893        .map_err(|e| format!("Failed to fetch parent: {}", e))?
894        .ok_or_else(|| format!("Node not found: {}", node_id))?;
895
896        let parent_id: Option<String> = row.get("parent_id");
897        match parent_id {
898            Some(id_str) => Ok(Some(
899                ArborId::parse_str(&id_str)
900                    .map_err(|e| format!("Invalid parent ID: {}", e))?,
901            )),
902            None => Ok(None),
903        }
904    }
905
906    /// Get path from root to a node (list of node IDs)
907    pub async fn node_get_path(
908        &self,
909        tree_id: &TreeId,
910        node_id: &NodeId,
911    ) -> Result<Vec<NodeId>, ArborError> {
912        let mut path = Vec::new();
913        let mut current_id = Some(*node_id);
914
915        // Walk up the tree to root
916        while let Some(id) = current_id {
917            path.push(id);
918            current_id = self.node_get_parent(tree_id, &id).await?;
919        }
920
921        // Reverse to get root-to-node path
922        path.reverse();
923        Ok(path)
924    }
925
926    /// List all leaf nodes in a tree
927    pub async fn context_list_leaves(
928        &self,
929        tree_id: &TreeId,
930    ) -> Result<Vec<NodeId>, ArborError> {
931        let rows = sqlx::query(
932            "SELECT id FROM nodes
933             WHERE tree_id = ? AND state = 'active'
934             AND id NOT IN (SELECT DISTINCT parent_id FROM nodes WHERE parent_id IS NOT NULL AND tree_id = ?)",
935        )
936        .bind(tree_id.to_string())
937        .bind(tree_id.to_string())
938        .fetch_all(&self.pool)
939        .await
940        .map_err(|e| format!("Failed to fetch leaf nodes: {}", e))?;
941
942        let leaves: Result<Vec<NodeId>, ArborError> = rows
943            .iter()
944            .map(|row| {
945                let id_str: String = row.get("id");
946                ArborId::parse_str(&id_str)
947                    .map_err(|e| format!("Invalid node ID: {}", e).into())
948            })
949            .collect();
950
951        leaves
952    }
953
954    /// Get the full path data from root to a node (all node data)
955    pub async fn context_get_path(
956        &self,
957        tree_id: &TreeId,
958        node_id: &NodeId,
959    ) -> Result<Vec<Node>, ArborError> {
960        let path_ids = self.node_get_path(tree_id, node_id).await?;
961        let nodes = self.get_nodes_for_tree(tree_id).await?;
962
963        let path_nodes: Result<Vec<Node>, ArborError> = path_ids
964            .iter()
965            .map(|id| {
966                nodes
967                    .get(id)
968                    .cloned()
969                    .ok_or_else(|| format!("Node not found in path: {}", id).into())
970            })
971            .collect();
972
973        path_nodes
974    }
975
976    /// Get all external handles in the path to a node
977    pub async fn context_get_handles(
978        &self,
979        tree_id: &TreeId,
980        node_id: &NodeId,
981    ) -> Result<Vec<Handle>, ArborError> {
982        let path_nodes = self.context_get_path(tree_id, node_id).await?;
983
984        let handles: Vec<Handle> = path_nodes
985            .iter()
986            .filter_map(|node| match &node.data {
987                NodeType::External { handle } => Some(handle.clone()),
988                NodeType::Text { .. } => None,
989            })
990            .collect();
991
992        Ok(handles)
993    }
994
995    /// Cleanup task: Archive trees scheduled for deletion (after 7 days)
996    pub async fn cleanup_scheduled_trees(&self) -> Result<usize, ArborError> {
997        let now = current_timestamp();
998        let seven_days_ago = now - (7 * 24 * 60 * 60);
999
1000        let result = sqlx::query(
1001            "UPDATE trees
1002             SET state = 'archived', archived_at = ?, updated_at = ?
1003             WHERE state = 'scheduled_delete' AND scheduled_deletion_at < ?",
1004        )
1005        .bind(now)
1006        .bind(now)
1007        .bind(seven_days_ago)
1008        .execute(&self.pool)
1009        .await
1010        .map_err(|e| format!("Failed to archive trees: {}", e))?;
1011
1012        Ok(result.rows_affected() as usize)
1013    }
1014}
1015
1016/// Get current Unix timestamp in seconds
1017fn current_timestamp() -> i64 {
1018    SystemTime::now()
1019        .duration_since(UNIX_EPOCH)
1020        .unwrap()
1021        .as_secs() as i64
1022}