Skip to main content

plexus_substrate/activations/arbor/
storage.rs

1use super::types::{
2    ArborError, ArborId, Node, NodeId, NodeType, ResourceRefs, ResourceState, Tree, TreeId, Handle,
3};
4use serde_json::Value;
5use sqlx::{sqlite::{SqliteConnectOptions, SqlitePool}, ConnectOptions, Row};
6use uuid::Uuid;
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11/// Configuration for Arbor storage
12#[derive(Debug, Clone)]
13pub struct ArborConfig {
14    /// Duration before scheduled resources move to archived (seconds)
15    pub scheduled_deletion_window: i64, // Default: 7 days = 604800
16
17    /// Duration before archived resources are purged (seconds)
18    pub archive_window: i64, // Default: 30 days = 2592000
19
20    /// Path to SQLite database
21    pub db_path: PathBuf,
22
23    /// Enable auto-cleanup background task
24    pub auto_cleanup: bool,
25
26    /// Cleanup task interval (seconds)
27    pub cleanup_interval: i64, // Default: 1 hour = 3600
28}
29
30impl Default for ArborConfig {
31    fn default() -> Self {
32        Self {
33            scheduled_deletion_window: 604800,  // 7 days
34            archive_window: 2592000,            // 30 days
35            db_path: PathBuf::from("arbor.db"),
36            auto_cleanup: true,
37            cleanup_interval: 3600,             // 1 hour
38        }
39    }
40}
41
42/// SQLite-based storage for Arbor tree structures.
43///
44/// # Usage Pattern: Direct Injection
45///
46/// ArborStorage is **infrastructure** - activations should receive it directly
47/// at construction time, NOT via Plexus routing.
48///
49/// ```ignore
50/// // Correct: Direct injection
51/// let cone = Cone::new(cone_config, arbor_storage.clone()).await?;
52///
53/// // Then use directly for tree operations
54/// let tree = arbor_storage.tree_get(&tree_id).await?;
55/// let node_id = arbor_storage.node_create_external(&tree_id, parent, handle, None).await?;
56/// ```
57///
58/// **Do NOT** route tree operations through Plexus - that adds unnecessary
59/// serialization overhead for what should be direct method calls.
60///
61/// The **only** case where Plexus is needed for Arbor-related data is
62/// cross-plugin handle resolution: when you have a Handle pointing to
63/// external data (e.g., a Cone message) and need to resolve its content.
64///
65/// See: `docs/architecture/*_arbor-usage-pattern.md`
66pub struct ArborStorage {
67    pool: SqlitePool,
68    #[allow(dead_code)]
69    config: ArborConfig,
70}
71
72impl ArborStorage {
73    /// Create a new storage instance and run migrations
74    pub async fn new(config: ArborConfig) -> Result<Self, ArborError> {
75        let db_url = format!("sqlite:{}?mode=rwc", config.db_path.display());
76        let connect_options: SqliteConnectOptions = db_url.parse()
77            .map_err(|e| format!("Failed to parse database URL: {}", e))?;
78        let connect_options = connect_options.disable_statement_logging();
79        let pool = SqlitePool::connect_with(connect_options.clone())
80            .await
81            .map_err(|e| format!("Failed to connect to database: {}", e))?;
82
83        let storage = Self { pool, config };
84        storage.run_migrations().await?;
85
86        Ok(storage)
87    }
88
89    /// Run database migrations
90    async fn run_migrations(&self) -> Result<(), ArborError> {
91        sqlx::query(
92            r#"
93            CREATE TABLE IF NOT EXISTS trees (
94                id TEXT PRIMARY KEY,
95                root_node_id TEXT NOT NULL,
96                ref_count INTEGER NOT NULL DEFAULT 1,
97                state TEXT NOT NULL DEFAULT 'active',
98                scheduled_deletion_at INTEGER,
99                archived_at INTEGER,
100                created_at INTEGER NOT NULL,
101                updated_at INTEGER NOT NULL,
102                metadata TEXT
103            );
104
105            CREATE TABLE IF NOT EXISTS tree_refs (
106                tree_id TEXT NOT NULL,
107                owner_id TEXT NOT NULL,
108                count INTEGER NOT NULL DEFAULT 1,
109                claimed_at INTEGER NOT NULL,
110                PRIMARY KEY (tree_id, owner_id),
111                FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
112            );
113
114            CREATE TABLE IF NOT EXISTS nodes (
115                id TEXT PRIMARY KEY,
116                tree_id TEXT NOT NULL,
117                parent_id TEXT,
118                ref_count INTEGER NOT NULL DEFAULT 1,
119                state TEXT NOT NULL DEFAULT 'active',
120                scheduled_deletion_at INTEGER,
121                archived_at INTEGER,
122                node_type TEXT NOT NULL,
123                content TEXT,
124                handle_plugin_id TEXT,
125                handle_version TEXT,
126                handle_method TEXT,
127                handle_meta TEXT,
128                created_at INTEGER NOT NULL,
129                metadata TEXT,
130                FOREIGN KEY (tree_id) REFERENCES trees(id) ON DELETE CASCADE
131            );
132
133            CREATE TABLE IF NOT EXISTS node_refs (
134                node_id TEXT NOT NULL,
135                owner_id TEXT NOT NULL,
136                count INTEGER NOT NULL DEFAULT 1,
137                claimed_at INTEGER NOT NULL,
138                PRIMARY KEY (node_id, owner_id),
139                FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
140            );
141
142            CREATE TABLE IF NOT EXISTS node_children (
143                parent_id TEXT NOT NULL,
144                child_id TEXT NOT NULL,
145                position INTEGER NOT NULL,
146                PRIMARY KEY (parent_id, child_id),
147                FOREIGN KEY (parent_id) REFERENCES nodes(id) ON DELETE CASCADE,
148                FOREIGN KEY (child_id) REFERENCES nodes(id) ON DELETE CASCADE
149            );
150
151            CREATE INDEX IF NOT EXISTS idx_trees_state ON trees(state);
152            CREATE INDEX IF NOT EXISTS idx_trees_scheduled ON trees(scheduled_deletion_at) WHERE state = 'scheduled_delete';
153            CREATE INDEX IF NOT EXISTS idx_trees_archived ON trees(archived_at) WHERE state = 'archived';
154            CREATE INDEX IF NOT EXISTS idx_nodes_tree ON nodes(tree_id);
155            CREATE INDEX IF NOT EXISTS idx_nodes_parent ON nodes(parent_id);
156            CREATE INDEX IF NOT EXISTS idx_nodes_state ON nodes(state);
157            CREATE INDEX IF NOT EXISTS idx_nodes_scheduled ON nodes(scheduled_deletion_at) WHERE state = 'scheduled_delete';
158            CREATE INDEX IF NOT EXISTS idx_node_children_parent ON node_children(parent_id);
159            CREATE INDEX IF NOT EXISTS idx_node_children_child ON node_children(child_id);
160            "#,
161        )
162        .execute(&self.pool)
163        .await
164        .map_err(|e| format!("Failed to run migrations: {}", e))?;
165
166        Ok(())
167    }
168
169    // ========================================================================
170    // Tree Operations
171    // ========================================================================
172
173    /// Create a new tree with a root node
174    pub async fn tree_create(
175        &self,
176        metadata: Option<serde_json::Value>,
177        owner_id: &str,
178    ) -> Result<TreeId, ArborError> {
179        self.tree_create_with_id(None, metadata, owner_id).await
180    }
181
182    /// Create a new tree with an optional custom ID (derived from path)
183    pub async fn tree_create_with_id(
184        &self,
185        tree_id: Option<TreeId>,
186        metadata: Option<serde_json::Value>,
187        owner_id: &str,
188    ) -> Result<TreeId, ArborError> {
189        let tree_id = tree_id.unwrap_or_else(TreeId::new);
190        let root_id = NodeId::new();
191        let now = current_timestamp();
192
193        let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
194
195        // Create tree
196        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
197        sqlx::query(
198            "INSERT INTO trees (id, root_node_id, ref_count, state, created_at, updated_at, metadata)
199             VALUES (?, ?, 1, 'active', ?, ?, ?)",
200        )
201        .bind(tree_id.to_string())
202        .bind(root_id.to_string())
203        .bind(now)
204        .bind(now)
205        .bind(metadata_json)
206        .execute(&mut *tx)
207        .await
208        .map_err(|e| format!("Failed to create tree: {}", e))?;
209
210        // Create tree ref for owner
211        sqlx::query(
212            "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at) VALUES (?, ?, 1, ?)",
213        )
214        .bind(tree_id.to_string())
215        .bind(owner_id)
216        .bind(now)
217        .execute(&mut *tx)
218        .await
219        .map_err(|e| format!("Failed to create tree ref: {}", e))?;
220
221        // Create root node (empty text node)
222        sqlx::query(
223            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, created_at)
224             VALUES (?, ?, NULL, 1, 'active', 'text', '', ?)",
225        )
226        .bind(root_id.to_string())
227        .bind(tree_id.to_string())
228        .bind(now)
229        .execute(&mut *tx)
230        .await
231        .map_err(|e| format!("Failed to create root node: {}", e))?;
232
233        tx.commit().await.map_err(|e| e.to_string())?;
234
235        Ok(tree_id)
236    }
237
238    /// Get a tree by ID (only active trees)
239    pub async fn tree_get(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
240        self.tree_get_internal(tree_id, false).await
241    }
242
243    /// Get an archived tree by ID
244    pub async fn tree_get_archived(&self, tree_id: &TreeId) -> Result<Tree, ArborError> {
245        self.tree_get_internal(tree_id, true).await
246    }
247
248    /// Internal tree getter
249    async fn tree_get_internal(
250        &self,
251        tree_id: &TreeId,
252        allow_archived: bool,
253    ) -> Result<Tree, ArborError> {
254        // Get tree metadata
255        let tree_row = sqlx::query(
256            "SELECT id, root_node_id, ref_count, state, scheduled_deletion_at, archived_at, created_at, updated_at, metadata
257             FROM trees WHERE id = ?",
258        )
259        .bind(tree_id.to_string())
260        .fetch_optional(&self.pool)
261        .await
262        .map_err(|e| format!("Failed to fetch tree: {}", e))?
263        .ok_or_else(|| format!("Tree not found: {}", tree_id))?;
264
265        let state_str: String = tree_row.get("state");
266        let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
267
268        if !allow_archived && state == ResourceState::Archived {
269            return Err("Tree is archived, use tree_get_archived()".into());
270        }
271
272        let root_node_id: String = tree_row.get("root_node_id");
273        let root_node_id = ArborId::parse_str(&root_node_id)
274            .map_err(|e| format!("Invalid root node ID: {}", e))?;
275
276        // Get all nodes for this tree
277        let nodes = self.get_nodes_for_tree(tree_id).await?;
278
279        // Get reference information
280        let refs = self.get_tree_refs(tree_id).await?;
281
282        let metadata_json: Option<String> = tree_row.get("metadata");
283        let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
284
285        Ok(Tree {
286            id: *tree_id,
287            root: root_node_id,
288            nodes,
289            state: Some(state),
290            refs: Some(refs),
291            scheduled_deletion_at: tree_row.get("scheduled_deletion_at"),
292            archived_at: tree_row.get("archived_at"),
293            created_at: tree_row.get("created_at"),
294            updated_at: tree_row.get("updated_at"),
295            metadata,
296        })
297    }
298
299    /// Get all nodes for a tree
300    async fn get_nodes_for_tree(&self, tree_id: &TreeId) -> Result<HashMap<NodeId, Node>, ArborError> {
301        let rows = sqlx::query(
302            "SELECT id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, archived_at,
303                    node_type, content, handle_plugin_id, handle_version, handle_method,
304                    handle_meta, created_at, metadata
305             FROM nodes WHERE tree_id = ?",
306        )
307        .bind(tree_id.to_string())
308        .fetch_all(&self.pool)
309        .await
310        .map_err(|e| format!("Failed to fetch nodes: {}", e))?;
311
312        let mut nodes = HashMap::new();
313
314        for row in rows {
315            let node_id_str: String = row.get("id");
316            let node_id = ArborId::parse_str(&node_id_str)
317                .map_err(|e| format!("Invalid node ID: {}", e))?;
318
319            let parent_id_str: Option<String> = row.get("parent_id");
320            let parent_id = parent_id_str
321                .map(|s| ArborId::parse_str(&s).map_err(|e| format!("Invalid parent ID: {}", e)))
322                .transpose()?;
323
324            // Get children for this node
325            let children = self.get_node_children_internal(&node_id).await?;
326
327            let node_type_str: String = row.get("node_type");
328            let data = match node_type_str.as_str() {
329                "text" => {
330                    let content: String = row.get("content");
331                    NodeType::Text { content }
332                }
333                "external" => {
334                    let plugin_id_str: String = row.get("handle_plugin_id");
335                    let plugin_id = Uuid::parse_str(&plugin_id_str)
336                        .map_err(|e| format!("Invalid handle plugin_id: {}", e))?;
337                    let version: String = row.get("handle_version");
338                    let method: String = row.get("handle_method");
339                    let meta_json: Option<String> = row.get("handle_meta");
340                    let meta: Vec<String> = meta_json
341                        .and_then(|s| serde_json::from_str(&s).ok())
342                        .unwrap_or_default();
343
344                    NodeType::External {
345                        handle: Handle::new(plugin_id, version, method)
346                            .with_meta(meta),
347                    }
348                }
349                _ => return Err(format!("Unknown node type: {}", node_type_str).into()),
350            };
351
352            let state_str: String = row.get("state");
353            let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
354
355            let refs = self.get_node_refs(&node_id).await?;
356
357            let metadata_json: Option<String> = row.get("metadata");
358            let metadata = metadata_json.and_then(|s| serde_json::from_str(&s).ok());
359
360            let node = Node {
361                id: node_id,
362                parent: parent_id,
363                children,
364                data,
365                state: Some(state),
366                refs: Some(refs),
367                scheduled_deletion_at: row.get("scheduled_deletion_at"),
368                archived_at: row.get("archived_at"),
369                created_at: row.get("created_at"),
370                metadata,
371            };
372
373            nodes.insert(node_id, node);
374        }
375
376        Ok(nodes)
377    }
378
379    /// Get children for a node (ordered by position)
380    async fn get_node_children_internal(&self, node_id: &NodeId) -> Result<Vec<NodeId>, ArborError> {
381        let rows = sqlx::query(
382            "SELECT child_id FROM node_children WHERE parent_id = ? ORDER BY position",
383        )
384        .bind(node_id.to_string())
385        .fetch_all(&self.pool)
386        .await
387        .map_err(|e| format!("Failed to fetch node children: {}", e))?;
388
389        let children: Result<Vec<NodeId>, ArborError> = rows
390            .iter()
391            .map(|row| {
392                let child_id_str: String = row.get("child_id");
393                ArborId::parse_str(&child_id_str)
394                    .map_err(|e| e.into())
395            })
396            .collect();
397
398        children
399    }
400
401    /// Add a child to a parent in the node_children table
402    async fn add_child_to_parent(&self, parent_id: &NodeId, child_id: &NodeId) -> Result<(), ArborError> {
403        // Get next position for this parent
404        let row = sqlx::query(
405            "SELECT COALESCE(MAX(position), -1) + 1 as next_pos FROM node_children WHERE parent_id = ?",
406        )
407        .bind(parent_id.to_string())
408        .fetch_one(&self.pool)
409        .await
410        .map_err(|e| format!("Failed to get next position: {}", e))?;
411
412        let next_pos: i64 = row.get("next_pos");
413
414        sqlx::query(
415            "INSERT INTO node_children (parent_id, child_id, position) VALUES (?, ?, ?)",
416        )
417        .bind(parent_id.to_string())
418        .bind(child_id.to_string())
419        .bind(next_pos)
420        .execute(&self.pool)
421        .await
422        .map_err(|e| format!("Failed to add child to parent: {}", e))?;
423
424        Ok(())
425    }
426
427    /// Get reference information for a tree
428    async fn get_tree_refs(&self, tree_id: &TreeId) -> Result<ResourceRefs, ArborError> {
429        let rows = sqlx::query(
430            "SELECT owner_id, count FROM tree_refs WHERE tree_id = ?",
431        )
432        .bind(tree_id.to_string())
433        .fetch_all(&self.pool)
434        .await
435        .map_err(|e| format!("Failed to fetch tree refs: {}", e))?;
436
437        let mut owners = HashMap::new();
438        let mut total = 0i64;
439
440        for row in rows {
441            let owner_id: String = row.get("owner_id");
442            let count: i64 = row.get("count");
443            owners.insert(owner_id, count);
444            total += count;
445        }
446
447        Ok(ResourceRefs {
448            ref_count: total,
449            owners,
450        })
451    }
452
453    /// Get reference information for a node
454    async fn get_node_refs(&self, node_id: &NodeId) -> Result<ResourceRefs, ArborError> {
455        let rows = sqlx::query(
456            "SELECT owner_id, count FROM node_refs WHERE node_id = ?",
457        )
458        .bind(node_id.to_string())
459        .fetch_all(&self.pool)
460        .await
461        .map_err(|e| format!("Failed to fetch node refs: {}", e))?;
462
463        let mut owners = HashMap::new();
464        let mut total = 0i64;
465
466        for row in rows {
467            let owner_id: String = row.get("owner_id");
468            let count: i64 = row.get("count");
469            owners.insert(owner_id, count);
470            total += count;
471        }
472
473        Ok(ResourceRefs {
474            ref_count: total,
475            owners,
476        })
477    }
478
479    /// List all tree IDs (active only by default)
480    pub async fn tree_list(&self, include_scheduled: bool) -> Result<Vec<TreeId>, ArborError> {
481        let query = if include_scheduled {
482            "SELECT id FROM trees WHERE state IN ('active', 'scheduled_delete')"
483        } else {
484            "SELECT id FROM trees WHERE state = 'active'"
485        };
486
487        let rows = sqlx::query(query)
488            .fetch_all(&self.pool)
489            .await
490            .map_err(|e| format!("Failed to list trees: {}", e))?;
491
492        let tree_ids: Result<Vec<TreeId>, ArborError> = rows
493            .iter()
494            .map(|row| {
495                let id_str: String = row.get("id");
496                ArborId::parse_str(&id_str)
497                    .map_err(|e| format!("Invalid tree ID: {}", e).into())
498            })
499            .collect();
500
501        tree_ids
502    }
503
504    /// List trees scheduled for deletion
505    pub async fn tree_list_scheduled(&self) -> Result<Vec<TreeId>, ArborError> {
506        let rows = sqlx::query(
507            "SELECT id FROM trees WHERE state = 'scheduled_delete'",
508        )
509        .fetch_all(&self.pool)
510        .await
511        .map_err(|e| format!("Failed to list scheduled trees: {}", e))?;
512
513        let tree_ids: Result<Vec<TreeId>, ArborError> = rows
514            .iter()
515            .map(|row| {
516                let id_str: String = row.get("id");
517                ArborId::parse_str(&id_str)
518                    .map_err(|e| format!("Invalid tree ID: {}", e).into())
519            })
520            .collect();
521
522        tree_ids
523    }
524
525    /// List archived trees
526    pub async fn tree_list_archived(&self) -> Result<Vec<TreeId>, ArborError> {
527        let rows = sqlx::query(
528            "SELECT id FROM trees WHERE state = 'archived'",
529        )
530        .fetch_all(&self.pool)
531        .await
532        .map_err(|e| format!("Failed to list archived trees: {}", e))?;
533
534        let tree_ids: Result<Vec<TreeId>, ArborError> = rows
535            .iter()
536            .map(|row| {
537                let id_str: String = row.get("id");
538                ArborId::parse_str(&id_str)
539                    .map_err(|e| format!("Invalid tree ID: {}", e).into())
540            })
541            .collect();
542
543        tree_ids
544    }
545
546    /// Update tree metadata
547    pub async fn tree_update_metadata(
548        &self,
549        tree_id: &TreeId,
550        metadata: Value,
551    ) -> Result<(), ArborError> {
552        let now = current_timestamp();
553        let metadata_json = serde_json::to_string(&metadata)
554            .map_err(|e| format!("Failed to serialize metadata: {}", e))?;
555
556        sqlx::query(
557            "UPDATE trees SET metadata = ?, updated_at = ? WHERE id = ? AND state = 'active'",
558        )
559        .bind(metadata_json)
560        .bind(now)
561        .bind(tree_id.to_string())
562        .execute(&self.pool)
563        .await
564        .map_err(|e| format!("Failed to update tree metadata: {}", e))?;
565
566        Ok(())
567    }
568
569    /// Claim ownership of a tree (increment reference count)
570    pub async fn tree_claim(
571        &self,
572        tree_id: &TreeId,
573        owner_id: &str,
574        count: i64,
575    ) -> Result<i64, ArborError> {
576        let now = current_timestamp();
577        let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
578
579        // Check if tree exists and is claimable (active or scheduled_delete)
580        let tree_row = sqlx::query(
581            "SELECT state, scheduled_deletion_at FROM trees WHERE id = ?",
582        )
583        .bind(tree_id.to_string())
584        .fetch_optional(&mut *tx)
585        .await
586        .map_err(|e| format!("Failed to fetch tree: {}", e))?
587        .ok_or_else(|| format!("Tree not found: {}", tree_id))?;
588
589        let state_str: String = tree_row.get("state");
590        let state = ResourceState::from_str(&state_str).unwrap_or(ResourceState::Active);
591
592        if state == ResourceState::Archived {
593            return Err("Cannot claim archived tree".into());
594        }
595
596        // If scheduled for deletion, reactivate it
597        if state == ResourceState::ScheduledDelete {
598            sqlx::query(
599                "UPDATE trees SET state = 'active', scheduled_deletion_at = NULL, updated_at = ? WHERE id = ?",
600            )
601            .bind(now)
602            .bind(tree_id.to_string())
603            .execute(&mut *tx)
604            .await
605            .map_err(|e| format!("Failed to reactivate tree: {}", e))?;
606        }
607
608        // Update or insert tree_ref
609        sqlx::query(
610            "INSERT INTO tree_refs (tree_id, owner_id, count, claimed_at)
611             VALUES (?, ?, ?, ?)
612             ON CONFLICT(tree_id, owner_id) DO UPDATE SET
613                count = count + excluded.count,
614                claimed_at = excluded.claimed_at",
615        )
616        .bind(tree_id.to_string())
617        .bind(owner_id)
618        .bind(count)
619        .bind(now)
620        .execute(&mut *tx)
621        .await
622        .map_err(|e| format!("Failed to claim tree: {}", e))?;
623
624        // Update tree ref_count
625        sqlx::query(
626            "UPDATE trees SET ref_count = ref_count + ?, updated_at = ? WHERE id = ?",
627        )
628        .bind(count)
629        .bind(now)
630        .bind(tree_id.to_string())
631        .execute(&mut *tx)
632        .await
633        .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
634
635        // Get the new ref_count
636        let new_count_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
637            .bind(tree_id.to_string())
638            .fetch_one(&mut *tx)
639            .await
640            .map_err(|e| format!("Failed to fetch new ref_count: {}", e))?;
641
642        let new_count: i64 = new_count_row.get("ref_count");
643
644        tx.commit().await.map_err(|e| e.to_string())?;
645        Ok(new_count)
646    }
647
648    /// Release ownership of a tree (decrement reference count)
649    pub async fn tree_release(
650        &self,
651        tree_id: &TreeId,
652        owner_id: &str,
653        count: i64,
654    ) -> Result<i64, ArborError> {
655        let now = current_timestamp();
656        let mut tx = self.pool.begin().await.map_err(|e| e.to_string())?;
657
658        // Check current ref count for this owner
659        let owner_ref = sqlx::query(
660            "SELECT count FROM tree_refs WHERE tree_id = ? AND owner_id = ?",
661        )
662        .bind(tree_id.to_string())
663        .bind(owner_id)
664        .fetch_optional(&mut *tx)
665        .await
666        .map_err(|e| format!("Failed to fetch tree ref: {}", e))?
667        .ok_or_else(|| format!("No reference found for owner {}", owner_id))?;
668
669        let current_count: i64 = owner_ref.get("count");
670        if current_count < count {
671            return Err(format!(
672                "Cannot release {} references, owner only has {}",
673                count, current_count
674            )
675            .into());
676        }
677
678        let new_count = current_count - count;
679
680        // Update or delete tree_ref
681        if new_count == 0 {
682            sqlx::query("DELETE FROM tree_refs WHERE tree_id = ? AND owner_id = ?")
683                .bind(tree_id.to_string())
684                .bind(owner_id)
685                .execute(&mut *tx)
686                .await
687                .map_err(|e| format!("Failed to delete tree ref: {}", e))?;
688        } else {
689            sqlx::query(
690                "UPDATE tree_refs SET count = ?, claimed_at = ? WHERE tree_id = ? AND owner_id = ?",
691            )
692            .bind(new_count)
693            .bind(now)
694            .bind(tree_id.to_string())
695            .bind(owner_id)
696            .execute(&mut *tx)
697            .await
698            .map_err(|e| format!("Failed to update tree ref: {}", e))?;
699        }
700
701        // Update tree ref_count
702        sqlx::query(
703            "UPDATE trees SET ref_count = ref_count - ?, updated_at = ? WHERE id = ?",
704        )
705        .bind(count)
706        .bind(now)
707        .bind(tree_id.to_string())
708        .execute(&mut *tx)
709        .await
710        .map_err(|e| format!("Failed to update tree ref_count: {}", e))?;
711
712        // Check if ref_count reached 0, schedule for deletion
713        let tree_row = sqlx::query("SELECT ref_count FROM trees WHERE id = ?")
714            .bind(tree_id.to_string())
715            .fetch_one(&mut *tx)
716            .await
717            .map_err(|e| format!("Failed to fetch tree: {}", e))?;
718
719        let ref_count: i64 = tree_row.get("ref_count");
720        if ref_count == 0 {
721            sqlx::query(
722                "UPDATE trees SET state = 'scheduled_delete', scheduled_deletion_at = ?, updated_at = ? WHERE id = ?",
723            )
724            .bind(now)
725            .bind(now)
726            .bind(tree_id.to_string())
727            .execute(&mut *tx)
728            .await
729            .map_err(|e| format!("Failed to schedule tree deletion: {}", e))?;
730        }
731
732        tx.commit().await.map_err(|e| e.to_string())?;
733        Ok(ref_count)
734    }
735
736    /// Create a text node in a tree
737    pub async fn node_create_text(
738        &self,
739        tree_id: &TreeId,
740        parent: Option<NodeId>,
741        content: String,
742        metadata: Option<Value>,
743    ) -> Result<NodeId, ArborError> {
744        let node_id = NodeId::new();
745        let now = current_timestamp();
746
747        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
748
749        sqlx::query(
750            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, content, metadata, created_at)
751             VALUES (?, ?, ?, 1, 'active', 'text', ?, ?, ?)",
752        )
753        .bind(node_id.to_string())
754        .bind(tree_id.to_string())
755        .bind(parent.map(|p| p.to_string()))
756        .bind(&content)
757        .bind(metadata_json)
758        .bind(now)
759        .execute(&self.pool)
760        .await
761        .map_err(|e| format!("Failed to create text node: {}", e))?;
762
763        // Add to node_children table if parent is specified
764        if let Some(parent_id) = parent {
765            self.add_child_to_parent(&parent_id, &node_id).await?;
766        }
767
768        Ok(node_id)
769    }
770
771    /// Create an external node in a tree
772    pub async fn node_create_external(
773        &self,
774        tree_id: &TreeId,
775        parent: Option<NodeId>,
776        handle: Handle,
777        metadata: Option<Value>,
778    ) -> Result<NodeId, ArborError> {
779        let node_id = NodeId::new();
780        let now = current_timestamp();
781
782        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
783        let meta_json = serde_json::to_string(&handle.meta).unwrap();
784
785        sqlx::query(
786            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
787             VALUES (?, ?, ?, 1, 'active', 'external', ?, ?, ?, ?, ?, ?)",
788        )
789        .bind(node_id.to_string())
790        .bind(tree_id.to_string())
791        .bind(parent.map(|p| p.to_string()))
792        .bind(handle.plugin_id.to_string())
793        .bind(&handle.version)
794        .bind(&handle.method)
795        .bind(&meta_json)
796        .bind(metadata_json)
797        .bind(now)
798        .execute(&self.pool)
799        .await
800        .map_err(|e| format!("Failed to create external node: {}", e))?;
801
802        // Add to node_children table if parent is specified
803        if let Some(parent_id) = parent {
804            self.add_child_to_parent(&parent_id, &node_id).await?;
805        }
806
807        Ok(node_id)
808    }
809
810    /// Create an external node that is already scheduled for deletion (ephemeral)
811    pub async fn node_create_external_ephemeral(
812        &self,
813        tree_id: &TreeId,
814        parent: Option<NodeId>,
815        handle: Handle,
816        metadata: Option<Value>,
817    ) -> Result<NodeId, ArborError> {
818        let node_id = NodeId::new();
819        let now = current_timestamp();
820
821        let metadata_json = metadata.map(|m| serde_json::to_string(&m).unwrap());
822        let meta_json = serde_json::to_string(&handle.meta).unwrap();
823
824        sqlx::query(
825            "INSERT INTO nodes (id, tree_id, parent_id, ref_count, state, scheduled_deletion_at, node_type, handle_plugin_id, handle_version, handle_method, handle_meta, metadata, created_at)
826             VALUES (?, ?, ?, 0, 'scheduled_delete', ?, 'external', ?, ?, ?, ?, ?, ?)",
827        )
828        .bind(node_id.to_string())
829        .bind(tree_id.to_string())
830        .bind(parent.map(|p| p.to_string()))
831        .bind(now) // scheduled_deletion_at = now (will be cleaned up by cleanup_scheduled_trees)
832        .bind(handle.plugin_id.to_string())
833        .bind(&handle.version)
834        .bind(&handle.method)
835        .bind(&meta_json)
836        .bind(metadata_json)
837        .bind(now)
838        .execute(&self.pool)
839        .await
840        .map_err(|e| format!("Failed to create ephemeral external node: {}", e))?;
841
842        // Add to node_children table if parent is specified
843        if let Some(parent_id) = parent {
844            self.add_child_to_parent(&parent_id, &node_id).await?;
845        }
846
847        Ok(node_id)
848    }
849
850    /// Get a node by ID
851    pub async fn node_get(
852        &self,
853        tree_id: &TreeId,
854        node_id: &NodeId,
855    ) -> Result<Node, ArborError> {
856        let nodes = self.get_nodes_for_tree(tree_id).await?;
857        nodes
858            .get(node_id)
859            .cloned()
860            .ok_or_else(|| format!("Node not found: {}", node_id).into())
861    }
862
863    /// Get children of a node
864    pub async fn node_get_children(
865        &self,
866        tree_id: &TreeId,
867        node_id: &NodeId,
868    ) -> Result<Vec<NodeId>, ArborError> {
869        let rows = sqlx::query(
870            "SELECT id FROM nodes WHERE tree_id = ? AND parent_id = ? AND state = 'active'",
871        )
872        .bind(tree_id.to_string())
873        .bind(node_id.to_string())
874        .fetch_all(&self.pool)
875        .await
876        .map_err(|e| format!("Failed to fetch children: {}", e))?;
877
878        let children: Result<Vec<NodeId>, ArborError> = rows
879            .iter()
880            .map(|row| {
881                let id_str: String = row.get("id");
882                ArborId::parse_str(&id_str)
883                    .map_err(|e| format!("Invalid node ID: {}", e).into())
884            })
885            .collect();
886
887        children
888    }
889
890    /// Get parent of a node
891    pub async fn node_get_parent(
892        &self,
893        tree_id: &TreeId,
894        node_id: &NodeId,
895    ) -> Result<Option<NodeId>, ArborError> {
896        let row = sqlx::query(
897            "SELECT parent_id FROM nodes WHERE tree_id = ? AND id = ?",
898        )
899        .bind(tree_id.to_string())
900        .bind(node_id.to_string())
901        .fetch_optional(&self.pool)
902        .await
903        .map_err(|e| format!("Failed to fetch parent: {}", e))?
904        .ok_or_else(|| format!("Node not found: {}", node_id))?;
905
906        let parent_id: Option<String> = row.get("parent_id");
907        match parent_id {
908            Some(id_str) => Ok(Some(
909                ArborId::parse_str(&id_str)
910                    .map_err(|e| format!("Invalid parent ID: {}", e))?,
911            )),
912            None => Ok(None),
913        }
914    }
915
916    /// Get path from root to a node (list of node IDs)
917    pub async fn node_get_path(
918        &self,
919        tree_id: &TreeId,
920        node_id: &NodeId,
921    ) -> Result<Vec<NodeId>, ArborError> {
922        let mut path = Vec::new();
923        let mut current_id = Some(*node_id);
924
925        // Walk up the tree to root
926        while let Some(id) = current_id {
927            path.push(id);
928            current_id = self.node_get_parent(tree_id, &id).await?;
929        }
930
931        // Reverse to get root-to-node path
932        path.reverse();
933        Ok(path)
934    }
935
936    /// List all leaf nodes in a tree
937    pub async fn context_list_leaves(
938        &self,
939        tree_id: &TreeId,
940    ) -> Result<Vec<NodeId>, ArborError> {
941        let rows = sqlx::query(
942            "SELECT id FROM nodes
943             WHERE tree_id = ? AND state = 'active'
944             AND id NOT IN (SELECT DISTINCT parent_id FROM nodes WHERE parent_id IS NOT NULL AND tree_id = ?)",
945        )
946        .bind(tree_id.to_string())
947        .bind(tree_id.to_string())
948        .fetch_all(&self.pool)
949        .await
950        .map_err(|e| format!("Failed to fetch leaf nodes: {}", e))?;
951
952        let leaves: Result<Vec<NodeId>, ArborError> = rows
953            .iter()
954            .map(|row| {
955                let id_str: String = row.get("id");
956                ArborId::parse_str(&id_str)
957                    .map_err(|e| format!("Invalid node ID: {}", e).into())
958            })
959            .collect();
960
961        leaves
962    }
963
964    /// Get the full path data from root to a node (all node data)
965    pub async fn context_get_path(
966        &self,
967        tree_id: &TreeId,
968        node_id: &NodeId,
969    ) -> Result<Vec<Node>, ArborError> {
970        let path_ids = self.node_get_path(tree_id, node_id).await?;
971        let nodes = self.get_nodes_for_tree(tree_id).await?;
972
973        let path_nodes: Result<Vec<Node>, ArborError> = path_ids
974            .iter()
975            .map(|id| {
976                nodes
977                    .get(id)
978                    .cloned()
979                    .ok_or_else(|| format!("Node not found in path: {}", id).into())
980            })
981            .collect();
982
983        path_nodes
984    }
985
986    /// Get all external handles in the path to a node
987    pub async fn context_get_handles(
988        &self,
989        tree_id: &TreeId,
990        node_id: &NodeId,
991    ) -> Result<Vec<Handle>, ArborError> {
992        let path_nodes = self.context_get_path(tree_id, node_id).await?;
993
994        let handles: Vec<Handle> = path_nodes
995            .iter()
996            .filter_map(|node| match &node.data {
997                NodeType::External { handle } => Some(handle.clone()),
998                NodeType::Text { .. } => None,
999            })
1000            .collect();
1001
1002        Ok(handles)
1003    }
1004
1005    /// Cleanup task: Archive trees scheduled for deletion (after 7 days)
1006    pub async fn cleanup_scheduled_trees(&self) -> Result<usize, ArborError> {
1007        let now = current_timestamp();
1008        let seven_days_ago = now - (7 * 24 * 60 * 60);
1009
1010        let result = sqlx::query(
1011            "UPDATE trees
1012             SET state = 'archived', archived_at = ?, updated_at = ?
1013             WHERE state = 'scheduled_delete' AND scheduled_deletion_at < ?",
1014        )
1015        .bind(now)
1016        .bind(now)
1017        .bind(seven_days_ago)
1018        .execute(&self.pool)
1019        .await
1020        .map_err(|e| format!("Failed to archive trees: {}", e))?;
1021
1022        Ok(result.rows_affected() as usize)
1023    }
1024
1025    /// Query trees by metadata filter (e.g., {"type": "orcha_monitor"})
1026    pub async fn tree_query_by_metadata(&self, filter: &Value) -> Result<Vec<TreeId>, ArborError> {
1027        let rows = sqlx::query(
1028            "SELECT id, metadata FROM trees WHERE state = 'active'"
1029        )
1030        .fetch_all(&self.pool)
1031        .await
1032        .map_err(|e| format!("Failed to query trees: {}", e))?;
1033
1034        let mut matching_trees = Vec::new();
1035
1036        for row in rows {
1037            let tree_id_str: String = row.get("id");
1038            let metadata_str: Option<String> = row.get("metadata");
1039
1040            if let Some(metadata_json) = metadata_str {
1041                if let Ok(metadata) = serde_json::from_str::<Value>(&metadata_json) {
1042                    // Check if metadata matches filter
1043                    if metadata_matches(filter, &metadata) {
1044                        if let Ok(tree_id) = ArborId::parse_str(&tree_id_str) {
1045                            matching_trees.push(tree_id);
1046                        }
1047                    }
1048                }
1049            }
1050        }
1051
1052        Ok(matching_trees)
1053    }
1054}
1055
1056/// Helper to check if metadata matches a filter
1057fn metadata_matches(filter: &Value, metadata: &Value) -> bool {
1058    match (filter, metadata) {
1059        (Value::Object(filter_obj), Value::Object(meta_obj)) => {
1060            // Check if all filter keys exist in metadata with matching values
1061            filter_obj.iter().all(|(key, filter_value)| {
1062                meta_obj.get(key).map_or(false, |meta_value| {
1063                    metadata_matches(filter_value, meta_value)
1064                })
1065            })
1066        }
1067        (Value::Array(filter_arr), Value::Array(meta_arr)) => {
1068            // Arrays must match exactly (for simplicity)
1069            filter_arr == meta_arr
1070        }
1071        (filter_val, meta_val) => filter_val == meta_val,
1072    }
1073}
1074
1075/// Get current Unix timestamp in seconds
1076fn current_timestamp() -> i64 {
1077    SystemTime::now()
1078        .duration_since(UNIX_EPOCH)
1079        .unwrap()
1080        .as_secs() as i64
1081}