Skip to main content

plexus_substrate/activations/arbor/
storage.rs

1use super::types::{
2    ArborError, ArborId, Node, NodeId, NodeType, ResourceRefs, ResourceState, Tree, TreeId, Handle,
3};
4use crate::activations::storage::init_sqlite_pool;
5use crate::activation_db_path_from_module;
6use serde_json::Value;
7use sqlx::{sqlite::SqlitePool, Row};
8use uuid::Uuid;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13/// Configuration for Arbor storage
14#[derive(Debug, Clone)]
15pub struct ArborConfig {
16    /// Duration before scheduled resources move to archived (seconds)
17    pub scheduled_deletion_window: i64, // Default: 7 days = 604800
18
19    /// Duration before archived resources are purged (seconds)
20    pub archive_window: i64, // Default: 30 days = 2592000
21
22    /// Path to SQLite database
23    pub db_path: PathBuf,
24
25    /// Enable auto-cleanup background task
26    pub auto_cleanup: bool,
27
28    /// Cleanup task interval (seconds)
29    pub cleanup_interval: i64, // Default: 1 hour = 3600
30}
31
32impl Default for ArborConfig {
33    fn default() -> Self {
34        Self {
35            scheduled_deletion_window: 604800,  // 7 days
36            archive_window: 2592000,            // 30 days
37            db_path: activation_db_path_from_module!("arbor.db"),
38            auto_cleanup: true,
39            cleanup_interval: 3600,             // 1 hour
40        }
41    }
42}
43
44/// SQLite-based storage for Arbor tree structures.
45///
46/// # Usage Pattern: Direct Injection
47///
48/// ArborStorage is **infrastructure** - activations should receive it directly
49/// at construction time, NOT via Plexus routing.
50///
51/// ```ignore
52/// // Correct: Direct injection
53/// let cone = Cone::new(cone_config, arbor_storage.clone()).await?;
54///
55/// // Then use directly for tree operations
56/// let tree = arbor_storage.tree_get(&tree_id).await?;
57/// let node_id = arbor_storage.node_create_external(&tree_id, parent, handle, None).await?;
58/// ```
59///
60/// **Do NOT** route tree operations through Plexus - that adds unnecessary
61/// serialization overhead for what should be direct method calls.
62///
63/// The **only** case where Plexus is needed for Arbor-related data is
64/// cross-plugin handle resolution: when you have a Handle pointing to
65/// external data (e.g., a Cone message) and need to resolve its content.
66///
67/// See: `docs/architecture/*_arbor-usage-pattern.md`
68pub struct ArborStorage {
69    pool: SqlitePool,
70    #[allow(dead_code)]
71    config: ArborConfig,
72}
73
74impl ArborStorage {
75    /// Create a new storage instance and run migrations
76    pub async fn new(config: ArborConfig) -> Result<Self, ArborError> {
77        let pool = init_sqlite_pool(config.db_path.clone()).await?;
78
79        let storage = Self { pool, config };
80        storage.run_migrations().await?;
81
82        Ok(storage)
83    }
84
85    /// Run database migrations
86    async fn run_migrations(&self) -> Result<(), ArborError> {
87        sqlx::query(
88            r#"
89            CREATE TABLE IF NOT EXISTS trees (
90                id TEXT PRIMARY KEY,
91                root_node_id TEXT NOT NULL,
92                ref_count INTEGER NOT NULL DEFAULT 1,
93                state TEXT NOT NULL DEFAULT 'active',
94                scheduled_deletion_at INTEGER,
95                archived_at INTEGER,
96                created_at INTEGER NOT NULL,
97                updated_at INTEGER NOT NULL,
98                metadata TEXT
99            );
100
101            CREATE TABLE IF NOT EXISTS tree_refs (
102                tree_id TEXT NOT NULL,
103                owner_id TEXT NOT NULL,
104                count INTEGER NOT NULL DEFAULT 1,
105                claimed_at INTEGER NOT NULL,
106                PRIMARY KEY (tree_id, owner_id),
107                FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
108            );
109
110            CREATE TABLE IF NOT EXISTS nodes (
111                id TEXT PRIMARY KEY,
112                tree_id TEXT NOT NULL,
113                parent_id TEXT,
114                ref_count INTEGER NOT NULL DEFAULT 1,
115                state TEXT NOT NULL DEFAULT 'active',
116                scheduled_deletion_at INTEGER,
117                archived_at INTEGER,
118                node_type TEXT NOT NULL,
119                content TEXT,
120                handle_plugin_id TEXT,
121                handle_version TEXT,
122                handle_method TEXT,
123                handle_meta TEXT,
124                created_at INTEGER NOT NULL,
125                metadata TEXT,
126                FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
127            );
128
129            CREATE TABLE IF NOT EXISTS node_refs (
130                node_id TEXT NOT NULL,
131                owner_id TEXT NOT NULL,
132                count INTEGER NOT NULL DEFAULT 1,
133                claimed_at INTEGER NOT NULL,
134                PRIMARY KEY (node_id, owner_id),
135                FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
136            );
137
138            CREATE TABLE IF NOT EXISTS node_children (
139                parent_id TEXT NOT NULL,
140                child_id TEXT NOT NULL,
141                position INTEGER NOT NULL,
142                PRIMARY KEY (parent_id, child_id),
143                FOREIGN KEY (parent_id) REFERENCES nodes(id) ON DELETE CASCADE,
144                FOREIGN KEY (child_id) REFERENCES nodes(id) ON DELETE CASCADE
145            );
146
147            CREATE INDEX IF NOT EXISTS idx_trees_state ON trees(state);
148            CREATE INDEX IF NOT EXISTS idx_trees_scheduled ON trees(scheduled_deletion_at) WHERE state = 'scheduled_delete';
149            CREATE INDEX IF NOT EXISTS idx_trees_archived ON trees(archived_at) WHERE state = 'archived';
150            CREATE INDEX IF NOT EXISTS idx_nodes_tree ON nodes(tree_id);
151            CREATE INDEX IF NOT EXISTS idx_nodes_parent ON nodes(parent_id);
152            CREATE INDEX IF NOT EXISTS idx_nodes_state ON nodes(state);
153            CREATE INDEX IF NOT EXISTS idx_nodes_scheduled ON nodes(scheduled_deletion_at) WHERE state = 'scheduled_delete';
154            CREATE INDEX IF NOT EXISTS idx_node_children_parent ON node_children(parent_id);
155            CREATE INDEX IF NOT EXISTS idx_node_children_child ON node_children(child_id);
156            "#,
157        )
158        .execute(&self.pool)
159        .await
160        .map_err(|e| ArborError::InitError { detail: format!("Failed to run migrations: {}", e) })?;
161
162        Ok(())
163    }
164
165    // ========================================================================
166    // Tree Operations
167    // ========================================================================
168
169    /// Create a new tree with a root node
170    pub async fn tree_create(
171        &self,
172        metadata: Option<serde_json::Value>,
173        owner_id: &str,
174    ) -> Result<TreeId, ArborError> {
175        self.tree_create_with_id(None, metadata, owner_id).await
176    }
177
178    /// Create a new tree with an optional custom ID (derived from path)
179    pub async fn tree_create_with_id(
180        &self,
181        tree_id: Option<TreeId>,
182        metadata: Option<serde_json::Value>,
183        owner_id: &str,
184    ) -> Result<TreeId, ArborError> {
185        let tree_id = tree_id.unwrap_or_else(TreeId::new);
186        let root_id = NodeId::new();
187        let now = current_timestamp();
188
189        let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
190
191        // Create tree
192        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
193        sqlx::query(
194            "INSERT INTO trees (id, root_node_id, ref_count, state, created_at, updated_at, metadata)
195             VALUES (?, ?, 1, 'active', ?, ?, ?)",
196        )
197        .bind(tree_id.to_string())
198        .bind(root_id.to_string())
199        .bind(now)
200        .bind(now)
201        .bind(metadata_json)
202        .execute(&mut *tx)
203        .await
204        .map_err(|e| format!("Failed to create tree: {}", e))?;
205
206        // Create tree ref for owner
207        sqlx::query(
208            "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at) VALUES (?, ?, 1, ?)",
209        )
210        .bind(tree_id.to_string())
211        .bind(owner_id)
212        .bind(now)
213        .execute(&mut *tx)
214        .await
215        .map_err(|e| format!("Failed to create tree ref: {}", e))?;
216
217        // Create root node (empty text node)
218        sqlx::query(
219            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, created_at)
220             VALUES (?, ?, NULL, 1, 'active', 'text', '', ?)",
221        )
222        .bind(root_id.to_string())
223        .bind(tree_id.to_string())
224        .bind(now)
225        .execute(&mut *tx)
226        .await
227        .map_err(|e| format!("Failed to create root node: {}", e))?;
228
229        tx.commit().await.map_err(|e| e.to_string())?;
230
231        Ok(tree_id)
232    }
233
234    /// Get a tree by ID (only active trees)
235    pub async fn tree_get(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
236        self.tree_get_internal(tree_id, false).await
237    }
238
239    /// Get an archived tree by ID
240    pub async fn tree_get_archived(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
241        self.tree_get_internal(tree_id, true).await
242    }
243
244    /// Internal tree getter
245    async fn tree_get_internal(
246        &self,
247        tree_id: &TreeId,
248        allow_archived: bool,
249    ) -> Result<Tree, ArborError> {
250        // Get tree metadata
251        let tree_row = sqlx::query(
252            "SELECT id, root_node_id, ref_count, state, scheduled_deletion_at, archived_at, created_at, updated_at, metadata
253             FROM trees WHERE id = ?",
254        )
255        .bind(tree_id.to_string())
256        .fetch_optional(&self.pool)
257        .await
258        .map_err(|e| ArborError::StorageError { operation: "fetch_tree".to_string(), detail: e.to_string() })?
259        .ok_or_else(|| ArborError::TreeNotFound { tree_id: tree_id.to_string() })?;
260
261        let state_str: String = tree_row.get("state");
262        let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
263
264        if !allow_archived && state == ResourceState::Archived {
265            return Err(ArborError::InvalidState { message: format!("Tree {} is archived, use tree_get_archived()", tree_id) });
266        }
267
268        let root_node_id: String = tree_row.get("root_node_id");
269        let root_node_id = ArborId::parse_str(&root_node_id)
270            .map_err(|e| format!("Invalid root node ID: {}", e))?;
271
272        // Get all nodes for this tree
273        let nodes = self.get_nodes_for_tree(tree_id).await?;
274
275        // Get reference information
276        let refs = self.get_tree_refs(tree_id).await?;
277
278        let metadata_json: Option<String> = tree_row.get("metadata");
279        let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
280
281        Ok(Tree {
282            id: *tree_id,
283            root: root_node_id,
284            nodes,
285            state: Some(state),
286            refs: Some(refs),
287            scheduled_deletion_at: tree_row.get("scheduled_deletion_at"),
288            archived_at: tree_row.get("archived_at"),
289            created_at: tree_row.get("created_at"),
290            updated_at: tree_row.get("updated_at"),
291            metadata,
292        })
293    }
294
295    /// Get all nodes for a tree
296    async fn get_nodes_for_tree(&self, tree_id: &TreeId) -> Result<HashMap<NodeId, Node>, ArborError> {
297        let rows = sqlx::query(
298            "SELECT id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, archived_at,
299                    node_type, content, handle_plugin_id, handle_version, handle_method,
300                    handle_meta, created_at, metadata
301             FROM nodes WHERE tree_id = ?",
302        )
303        .bind(tree_id.to_string())
304        .fetch_all(&self.pool)
305        .await
306        .map_err(|e| format!("Failed to fetch nodes: {}", e))?;
307
308        let mut nodes = HashMap::new();
309
310        for row in rows {
311            let node_id_str: String = row.get("id");
312            let node_id = ArborId::parse_str(&node_id_str)
313                .map_err(|e| format!("Invalid node ID: {}", e))?;
314
315            let parent_id_str: Option<String> = row.get("parent_id");
316            let parent_id = parent_id_str
317                .map(|s| ArborId::parse_str(&s).map_err(|e| format!("Invalid parent ID: {}", e)))
318                .transpose()?;
319
320            // Get children for this node
321            let children = self.get_node_children_internal(&node_id).await?;
322
323            let node_type_str: String = row.get("node_type");
324            let data = match node_type_str.as_str() {
325                "text" => {
326                    let content: String = row.get("content");
327                    NodeType::Text { content }
328                }
329                "external" => {
330                    let plugin_id_str: String = row.get("handle_plugin_id");
331                    let plugin_id = Uuid::parse_str(&plugin_id_str)
332                        .map_err(|e| format!("Invalid handle plugin_id: {}", e))?;
333                    let version: String = row.get("handle_version");
334                    let method: String = row.get("handle_method");
335                    let meta_json: Option<String> = row.get("handle_meta");
336                    let meta: Vec<String> = meta_json
337                        .and_then(|s| serde_json::from_str(&s).ok())
338                        .unwrap_or_default();
339
340                    NodeType::External {
341                        handle: Handle::new(plugin_id, version, method)
342                            .with_meta(meta),
343                    }
344                }
345                _ => return Err(format!("Unknown node type: {}", node_type_str).into()),
346            };
347
348            let state_str: String = row.get("state");
349            let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
350
351            let refs = self.get_node_refs(&node_id).await?;
352
353            let metadata_json: Option<String> = row.get("metadata");
354            let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
355
356            let node = Node {
357                id: node_id,
358                parent: parent_id,
359                children,
360                data,
361                state: Some(state),
362                refs: Some(refs),
363                scheduled_deletion_at: row.get("scheduled_deletion_at"),
364                archived_at: row.get("archived_at"),
365                created_at: row.get("created_at"),
366                metadata,
367            };
368
369            nodes.insert(node_id, node);
370        }
371
372        Ok(nodes)
373    }
374
375    /// Get children for a node (ordered by position)
376    async fn get_node_children_internal(&self, node_id: &NodeId) -> Result<Vec<NodeId>, ArborError> {
377        let rows = sqlx::query(
378            "SELECT child_id FROM node_children WHERE parent_id = ? ORDER BY position",
379        )
380        .bind(node_id.to_string())
381        .fetch_all(&self.pool)
382        .await
383        .map_err(|e| format!("Failed to fetch node children: {}", e))?;
384
385        let children: Result<Vec<NodeId>, ArborError> = rows
386            .iter()
387            .map(|row| {
388                let child_id_str: String = row.get("child_id");
389                ArborId::parse_str(&child_id_str)
390                    .map_err(|e| e.into())
391            })
392            .collect();
393
394        children
395    }
396
397    /// Add a child to a parent in the node_children table
398    async fn add_child_to_parent(&self, parent_id: &NodeId, child_id: &NodeId) -> Result<(), ArborError> {
399        // Get next position for this parent
400        let row = sqlx::query(
401            "SELECT COALESCE(MAX(position), -1) + 1 as next_pos FROM node_children WHERE parent_id = ?",
402        )
403        .bind(parent_id.to_string())
404        .fetch_one(&self.pool)
405        .await
406        .map_err(|e| format!("Failed to get next position: {}", e))?;
407
408        let next_pos: i64 = row.get("next_pos");
409
410        sqlx::query(
411            "INSERT INTO node_children (parent_id, child_id, position) VALUES (?, ?, ?)",
412        )
413        .bind(parent_id.to_string())
414        .bind(child_id.to_string())
415        .bind(next_pos)
416        .execute(&self.pool)
417        .await
418        .map_err(|e| format!("Failed to add child to parent: {}", e))?;
419
420        Ok(())
421    }
422
423    /// Get reference information for a tree
424    async fn get_tree_refs(&self, tree_id: &TreeId) -> Result<ResourceRefs, ArborError> {
425        let rows = sqlx::query(
426            "SELECT owner_id, count FROM tree_refs WHERE tree_id = ?",
427        )
428        .bind(tree_id.to_string())
429        .fetch_all(&self.pool)
430        .await
431        .map_err(|e| format!("Failed to fetch tree refs: {}", e))?;
432
433        let mut owners = HashMap::new();
434        let mut total = 0i64;
435
436        for row in rows {
437            let owner_id: String = row.get("owner_id");
438            let count: i64 = row.get("count");
439            owners.insert(owner_id, count);
440            total += count;
441        }
442
443        Ok(ResourceRefs {
444            ref_count: total,
445            owners,
446        })
447    }
448
449    /// Get reference information for a node
450    async fn get_node_refs(&self, node_id: &NodeId) -> Result<ResourceRefs, ArborError> {
451        let rows = sqlx::query(
452            "SELECT owner_id, count FROM node_refs WHERE node_id = ?",
453        )
454        .bind(node_id.to_string())
455        .fetch_all(&self.pool)
456        .await
457        .map_err(|e| format!("Failed to fetch node refs: {}", e))?;
458
459        let mut owners = HashMap::new();
460        let mut total = 0i64;
461
462        for row in rows {
463            let owner_id: String = row.get("owner_id");
464            let count: i64 = row.get("count");
465            owners.insert(owner_id, count);
466            total += count;
467        }
468
469        Ok(ResourceRefs {
470            ref_count: total,
471            owners,
472        })
473    }
474
475    /// List all tree IDs (active only by default)
476    pub async fn tree_list(&self, include_scheduled: bool) -> Result<Vec<TreeId>, ArborError> {
477        let query = if include_scheduled {
478            "SELECT id FROM trees WHERE state IN ('active', 'scheduled_delete')"
479        } else {
480            "SELECT id FROM trees WHERE state = 'active'"
481        };
482
483        let rows = sqlx::query(query)
484            .fetch_all(&self.pool)
485            .await
486            .map_err(|e| format!("Failed to list trees: {}", e))?;
487
488        let tree_ids: Result<Vec<TreeId>, ArborError> = rows
489            .iter()
490            .map(|row| {
491                let id_str: String = row.get("id");
492                ArborId::parse_str(&id_str)
493                    .map_err(|e| format!("Invalid tree ID: {}", e).into())
494            })
495            .collect();
496
497        tree_ids
498    }
499
500    /// List trees scheduled for deletion
501    pub async fn tree_list_scheduled(&self) -> Result<Vec<TreeId>, ArborError> {
502        let rows = sqlx::query(
503            "SELECT id FROM trees WHERE state = 'scheduled_delete'",
504        )
505        .fetch_all(&self.pool)
506        .await
507        .map_err(|e| format!("Failed to list scheduled trees: {}", e))?;
508
509        let tree_ids: Result<Vec<TreeId>, ArborError> = rows
510            .iter()
511            .map(|row| {
512                let id_str: String = row.get("id");
513                ArborId::parse_str(&id_str)
514                    .map_err(|e| format!("Invalid tree ID: {}", e).into())
515            })
516            .collect();
517
518        tree_ids
519    }
520
521    /// List archived trees
522    pub async fn tree_list_archived(&self) -> Result<Vec<TreeId>, ArborError> {
523        let rows = sqlx::query(
524            "SELECT id FROM trees WHERE state = 'archived'",
525        )
526        .fetch_all(&self.pool)
527        .await
528        .map_err(|e| format!("Failed to list archived trees: {}", e))?;
529
530        let tree_ids: Result<Vec<TreeId>, ArborError> = rows
531            .iter()
532            .map(|row| {
533                let id_str: String = row.get("id");
534                ArborId::parse_str(&id_str)
535                    .map_err(|e| format!("Invalid tree ID: {}", e).into())
536            })
537            .collect();
538
539        tree_ids
540    }
541
542    /// Update tree metadata
543    pub async fn tree_update_metadata(
544        &self,
545        tree_id: &TreeId,
546        metadata: Value,
547    ) -> Result<(), ArborError> {
548        let now = current_timestamp();
549        let metadata_json = serde_json::to_string(&metadata)
550            .map_err(|e| format!("Failed to serialize metadata: {}", e))?;
551
552        sqlx::query(
553            "UPDATE trees SET metadata = ?, updated_at = ? WHERE id = ? AND state = 'active'",
554        )
555        .bind(metadata_json)
556        .bind(now)
557        .bind(tree_id.to_string())
558        .execute(&self.pool)
559        .await
560        .map_err(|e| format!("Failed to update tree metadata: {}", e))?;
561
562        Ok(())
563    }
564
565    /// Claim ownership of a tree (increment reference count)
566    pub async fn tree_claim(
567        &self,
568        tree_id: &TreeId,
569        owner_id: &str,
570        count: i64,
571    ) -> Result<i64, ArborError> {
572        let now = current_timestamp();
573        let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
574
575        // Check if tree exists and is claimable (active or scheduled_delete)
576        let tree_row = sqlx::query(
577            "SELECT state, scheduled_deletion_at FROM trees WHERE id = ?",
578        )
579        .bind(tree_id.to_string())
580        .fetch_optional(&mut *tx)
581        .await
582        .map_err(|e| ArborError::StorageError { operation: "fetch_tree".to_string(), detail: e.to_string() })?
583        .ok_or_else(|| ArborError::TreeNotFound { tree_id: tree_id.to_string() })?;
584
585        let state_str: String = tree_row.get("state");
586        let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
587
588        if state == ResourceState::Archived {
589            return Err(ArborError::InvalidState { message: format!("Cannot claim archived tree {}", tree_id) });
590        }
591
592        // If scheduled for deletion, reactivate it
593        if state == ResourceState::ScheduledDelete {
594            sqlx::query(
595                "UPDATE trees SET state = 'active', scheduled_deletion_at = NULL, updated_at = ? WHERE id = ?",
596            )
597            .bind(now)
598            .bind(tree_id.to_string())
599            .execute(&mut *tx)
600            .await
601            .map_err(|e| format!("Failed to reactivate tree: {}", e))?;
602        }
603
604        // Update or insert tree_ref
605        sqlx::query(
606            "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at)
607             VALUES (?, ?, ?, ?)
608             ON CONFLICT(tree_id, owner_id) DO UPDATE SET
609                count = count + excluded.count,
610                claimed_at = excluded.claimed_at",
611        )
612        .bind(tree_id.to_string())
613        .bind(owner_id)
614        .bind(count)
615        .bind(now)
616        .execute(&mut *tx)
617        .await
618        .map_err(|e| format!("Failed to claim tree: {}", e))?;
619
620        // Update tree ref_count
621        sqlx::query(
622            "UPDATE trees SET ref_count = ref_count + ?, updated_at = ? WHERE id = ?",
623        )
624        .bind(count)
625        .bind(now)
626        .bind(tree_id.to_string())
627        .execute(&mut *tx)
628        .await
629        .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
630
631        // Get the new ref_count
632        let new_count_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
633            .bind(tree_id.to_string())
634            .fetch_one(&mut *tx)
635            .await
636            .map_err(|e| format!("Failed to fetch new ref_count: {}", e))?;
637
638        let new_count: i64 = new_count_row.get("ref_count");
639
640        tx.commit().await.map_err(|e| e.to_string())?;
641        Ok(new_count)
642    }
643
644    /// Release ownership of a tree (decrement reference count)
645    pub async fn tree_release(
646        &self,
647        tree_id: &TreeId,
648        owner_id: &str,
649        count: i64,
650    ) -> Result<i64, ArborError> {
651        let now = current_timestamp();
652        let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
653
654        // Check current ref count for this owner
655        let owner_ref = sqlx::query(
656            "SELECT count FROM tree_refs WHERE tree_id = ? AND owner_id = ?",
657        )
658        .bind(tree_id.to_string())
659        .bind(owner_id)
660        .fetch_optional(&mut *tx)
661        .await
662        .map_err(|e| format!("Failed to fetch tree ref: {}", e))?
663        .ok_or_else(|| format!("No reference found for owner {}", owner_id))?;
664
665        let current_count: i64 = owner_ref.get("count");
666        if current_count < count {
667            return Err(format!(
668                "Cannot release {} references, owner only has {}",
669                count, current_count
670            )
671            .into());
672        }
673
674        let new_count = current_count - count;
675
676        // Update or delete tree_ref
677        if new_count == 0 {
678            sqlx::query("DELETE FROM tree_refs WHERE tree_id = ? AND owner_id = ?")
679                .bind(tree_id.to_string())
680                .bind(owner_id)
681                .execute(&mut *tx)
682                .await
683                .map_err(|e| format!("Failed to delete tree ref: {}", e))?;
684        } else {
685            sqlx::query(
686                "UPDATE tree_refs SET count = ?, claimed_at = ? WHERE tree_id = ? AND owner_id = ?",
687            )
688            .bind(new_count)
689            .bind(now)
690            .bind(tree_id.to_string())
691            .bind(owner_id)
692            .execute(&mut *tx)
693            .await
694            .map_err(|e| format!("Failed to update tree ref: {}", e))?;
695        }
696
697        // Update tree ref_count
698        sqlx::query(
699            "UPDATE trees SET ref_count = ref_count - ?, updated_at = ? WHERE id = ?",
700        )
701        .bind(count)
702        .bind(now)
703        .bind(tree_id.to_string())
704        .execute(&mut *tx)
705        .await
706        .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
707
708        // Check if ref_count reached 0, schedule for deletion
709        let tree_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
710            .bind(tree_id.to_string())
711            .fetch_one(&mut *tx)
712            .await
713            .map_err(|e| format!("Failed to fetch tree: {}", e))?;
714
715        let ref_count: i64 = tree_row.get("ref_count");
716        if ref_count == 0 {
717            sqlx::query(
718                "UPDATE trees SET state = 'scheduled_delete', scheduled_deletion_at = ?, updated_at = ? WHERE id = ?",
719            )
720            .bind(now)
721            .bind(now)
722            .bind(tree_id.to_string())
723            .execute(&mut *tx)
724            .await
725            .map_err(|e| format!("Failed to schedule tree deletion: {}", e))?;
726        }
727
728        tx.commit().await.map_err(|e| e.to_string())?;
729        Ok(ref_count)
730    }
731
732    /// Create a text node in a tree
733    pub async fn node_create_text(
734        &self,
735        tree_id: &TreeId,
736        parent: Option<NodeId>,
737        content: String,
738        metadata: Option<Value>,
739    ) -> Result<NodeId, ArborError> {
740        let node_id = NodeId::new();
741        let now = current_timestamp();
742
743        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
744
745        sqlx::query(
746            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, metadata, created_at)
747             VALUES (?, ?, ?, 1, 'active', 'text', ?, ?, ?)",
748        )
749        .bind(node_id.to_string())
750        .bind(tree_id.to_string())
751        .bind(parent.map(|p| p.to_string()))
752        .bind(&content)
753        .bind(metadata_json)
754        .bind(now)
755        .execute(&self.pool)
756        .await
757        .map_err(|e| format!("Failed to create text node: {}", e))?;
758
759        // Add to node_children table if parent is specified
760        if let Some(parent_id) = parent {
761            self.add_child_to_parent(&parent_id, &node_id).await?;
762        }
763
764        Ok(node_id)
765    }
766
767    /// Create an external node in a tree
768    pub async fn node_create_external(
769        &self,
770        tree_id: &TreeId,
771        parent: Option<NodeId>,
772        handle: Handle,
773        metadata: Option<Value>,
774    ) -> Result<NodeId, ArborError> {
775        let node_id = NodeId::new();
776        let now = current_timestamp();
777
778        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
779        let meta_json = serde_json::to_string(&handle.meta).unwrap();
780
781        sqlx::query(
782            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
783             VALUES (?, ?, ?, 1, 'active', 'external', ?, ?, ?, ?, ?, ?)",
784        )
785        .bind(node_id.to_string())
786        .bind(tree_id.to_string())
787        .bind(parent.map(|p| p.to_string()))
788        .bind(handle.plugin_id.to_string())
789        .bind(&handle.version)
790        .bind(&handle.method)
791        .bind(&meta_json)
792        .bind(metadata_json)
793        .bind(now)
794        .execute(&self.pool)
795        .await
796        .map_err(|e| format!("Failed to create external node: {}", e))?;
797
798        // Add to node_children table if parent is specified
799        if let Some(parent_id) = parent {
800            self.add_child_to_parent(&parent_id, &node_id).await?;
801        }
802
803        Ok(node_id)
804    }
805
806    /// Create an external node that is already scheduled for deletion (ephemeral)
807    pub async fn node_create_external_ephemeral(
808        &self,
809        tree_id: &TreeId,
810        parent: Option<NodeId>,
811        handle: Handle,
812        metadata: Option<Value>,
813    ) -> Result<NodeId, ArborError> {
814        let node_id = NodeId::new();
815        let now = current_timestamp();
816
817        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
818        let meta_json = serde_json::to_string(&handle.meta).unwrap();
819
820        sqlx::query(
821            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
822             VALUES (?, ?, ?, 0, 'scheduled_delete', ?, 'external', ?, ?, ?, ?, ?, ?)",
823        )
824        .bind(node_id.to_string())
825        .bind(tree_id.to_string())
826        .bind(parent.map(|p| p.to_string()))
827        .bind(now) // scheduled_deletion_at = now (will be cleaned up by cleanup_scheduled_trees)
828        .bind(handle.plugin_id.to_string())
829        .bind(&handle.version)
830        .bind(&handle.method)
831        .bind(&meta_json)
832        .bind(metadata_json)
833        .bind(now)
834        .execute(&self.pool)
835        .await
836        .map_err(|e| format!("Failed to create ephemeral external node: {}", e))?;
837
838        // Add to node_children table if parent is specified
839        if let Some(parent_id) = parent {
840            self.add_child_to_parent(&parent_id, &node_id).await?;
841        }
842
843        Ok(node_id)
844    }
845
846    /// Get a node by ID
847    pub async fn node_get(
848        &self,
849        tree_id: &TreeId,
850        node_id: &NodeId,
851    ) -> Result<Node, ArborError> {
852        let nodes = self.get_nodes_for_tree(tree_id).await?;
853        nodes
854            .get(node_id)
855            .cloned()
856            .ok_or_else(|| ArborError::NodeNotFound { node_id: node_id.to_string(), tree_id: tree_id.to_string() })
857    }
858
859    /// Get children of a node
860    pub async fn node_get_children(
861        &self,
862        tree_id: &TreeId,
863        node_id: &NodeId,
864    ) -> Result<Vec<NodeId>, ArborError> {
865        let rows = sqlx::query(
866            "SELECT id FROM nodes WHERE tree_id = ? AND parent_id = ? AND state = 'active'",
867        )
868        .bind(tree_id.to_string())
869        .bind(node_id.to_string())
870        .fetch_all(&self.pool)
871        .await
872        .map_err(|e| format!("Failed to fetch children: {}", e))?;
873
874        let children: Result<Vec<NodeId>, ArborError> = rows
875            .iter()
876            .map(|row| {
877                let id_str: String = row.get("id");
878                ArborId::parse_str(&id_str)
879                    .map_err(|e| format!("Invalid node ID: {}", e).into())
880            })
881            .collect();
882
883        children
884    }
885
886    /// Get parent of a node
887    pub async fn node_get_parent(
888        &self,
889        tree_id: &TreeId,
890        node_id: &NodeId,
891    ) -> Result<Option<NodeId>, ArborError> {
892        let row = sqlx::query(
893            "SELECT parent_id FROM nodes WHERE tree_id = ? AND id = ?",
894        )
895        .bind(tree_id.to_string())
896        .bind(node_id.to_string())
897        .fetch_optional(&self.pool)
898        .await
899        .map_err(|e| ArborError::StorageError { operation: "fetch_parent".to_string(), detail: e.to_string() })?
900        .ok_or_else(|| ArborError::NodeNotFound { node_id: node_id.to_string(), tree_id: tree_id.to_string() })?;
901
902        let parent_id: Option<String> = row.get("parent_id");
903        match parent_id {
904            Some(id_str) => Ok(Some(
905                ArborId::parse_str(&id_str)
906                    .map_err(|e| format!("Invalid parent ID: {}", e))?,
907            )),
908            None => Ok(None),
909        }
910    }
911
912    /// Get path from root to a node (list of node IDs)
913    pub async fn node_get_path(
914        &self,
915        tree_id: &TreeId,
916        node_id: &NodeId,
917    ) -> Result<Vec<NodeId>, ArborError> {
918        let mut path = Vec::new();
919        let mut current_id = Some(*node_id);
920
921        // Walk up the tree to root
922        while let Some(id) = current_id {
923            path.push(id);
924            current_id = self.node_get_parent(tree_id, &id).await?;
925        }
926
927        // Reverse to get root-to-node path
928        path.reverse();
929        Ok(path)
930    }
931
932    /// List all leaf nodes in a tree
933    pub async fn context_list_leaves(
934        &self,
935        tree_id: &TreeId,
936    ) -> Result<Vec<NodeId>, ArborError> {
937        let rows = sqlx::query(
938            "SELECT id FROM nodes
939             WHERE tree_id = ? AND state = 'active'
940             AND id NOT IN (SELECT DISTINCT parent_id FROM nodes WHERE parent_id IS NOT NULL AND tree_id = ?)",
941        )
942        .bind(tree_id.to_string())
943        .bind(tree_id.to_string())
944        .fetch_all(&self.pool)
945        .await
946        .map_err(|e| format!("Failed to fetch leaf nodes: {}", e))?;
947
948        let leaves: Result<Vec<NodeId>, ArborError> = rows
949            .iter()
950            .map(|row| {
951                let id_str: String = row.get("id");
952                ArborId::parse_str(&id_str)
953                    .map_err(|e| format!("Invalid node ID: {}", e).into())
954            })
955            .collect();
956
957        leaves
958    }
959
960    /// Get the full path data from root to a node (all node data)
961    pub async fn context_get_path(
962        &self,
963        tree_id: &TreeId,
964        node_id: &NodeId,
965    ) -> Result<Vec<Node>, ArborError> {
966        let path_ids = self.node_get_path(tree_id, node_id).await?;
967        let nodes = self.get_nodes_for_tree(tree_id).await?;
968
969        let path_nodes: Result<Vec<Node>, ArborError> = path_ids
970            .iter()
971            .map(|id| {
972                nodes
973                    .get(id)
974                    .cloned()
975                    .ok_or_else(|| ArborError::NodeNotFound { node_id: id.to_string(), tree_id: tree_id.to_string() })
976            })
977            .collect();
978
979        path_nodes
980    }
981
982    /// Get all external handles in the path to a node
983    pub async fn context_get_handles(
984        &self,
985        tree_id: &TreeId,
986        node_id: &NodeId,
987    ) -> Result<Vec<Handle>, ArborError> {
988        let path_nodes = self.context_get_path(tree_id, node_id).await?;
989
990        let handles: Vec<Handle> = path_nodes
991            .iter()
992            .filter_map(|node| match &node.data {
993                NodeType::External { handle } => Some(handle.clone()),
994                NodeType::Text { .. } => None,
995            })
996            .collect();
997
998        Ok(handles)
999    }
1000
1001    /// Cleanup task: Archive trees scheduled for deletion (after 7 days)
1002    pub async fn cleanup_scheduled_trees(&self) -> Result<usize, ArborError> {
1003        let now = current_timestamp();
1004        let seven_days_ago = now - (7 * 24 * 60 * 60);
1005
1006        let result = sqlx::query(
1007            "UPDATE trees
1008             SET state = 'archived', archived_at = ?, updated_at = ?
1009             WHERE state = 'scheduled_delete' AND scheduled_deletion_at < ?",
1010        )
1011        .bind(now)
1012        .bind(now)
1013        .bind(seven_days_ago)
1014        .execute(&self.pool)
1015        .await
1016        .map_err(|e| format!("Failed to archive trees: {}", e))?;
1017
1018        Ok(result.rows_affected() as usize)
1019    }
1020
1021    /// Query trees by metadata filter (e.g., {"type": "orcha_monitor"})
1022    pub async fn tree_query_by_metadata(&self, filter: &Value) -> Result<Vec<TreeId>, ArborError> {
1023        let rows = sqlx::query(
1024            "SELECT id, metadata FROM trees WHERE state = 'active'"
1025        )
1026        .fetch_all(&self.pool)
1027        .await
1028        .map_err(|e| format!("Failed to query trees: {}", e))?;
1029
1030        let mut matching_trees = Vec::new();
1031
1032        for row in rows {
1033            let tree_id_str: String = row.get("id");
1034            let metadata_str: Option<String> = row.get("metadata");
1035
1036            if let Some(metadata_json) = metadata_str {
1037                if let Ok(metadata) = serde_json::from_str::<Value>(&metadata_json) {
1038                    // Check if metadata matches filter
1039                    if metadata_matches(filter, &metadata) {
1040                        if let Ok(tree_id) = ArborId::parse_str(&tree_id_str) {
1041                            matching_trees.push(tree_id);
1042                        }
1043                    }
1044                }
1045            }
1046        }
1047
1048        Ok(matching_trees)
1049    }
1050}
1051
1052/// Helper to check if metadata matches a filter
1053fn metadata_matches(filter: &Value, metadata: &Value) -> bool {
1054    match (filter, metadata) {
1055        (Value::Object(filter_obj), Value::Object(meta_obj)) => {
1056            // Check if all filter keys exist in metadata with matching values
1057            filter_obj.iter().all(|(key, filter_value)| {
1058                meta_obj.get(key).map_or(false, |meta_value| {
1059                    metadata_matches(filter_value, meta_value)
1060                })
1061            })
1062        }
1063        (Value::Array(filter_arr), Value::Array(meta_arr)) => {
1064            // Arrays must match exactly (for simplicity)
1065            filter_arr == meta_arr
1066        }
1067        (filter_val, meta_val) => filter_val == meta_val,
1068    }
1069}
1070
1071/// Get current Unix timestamp in seconds
1072fn current_timestamp() -> i64 {
1073    SystemTime::now()
1074        .duration_since(UNIX_EPOCH)
1075        .unwrap()
1076        .as_secs() as i64
1077}