Skip to main content

plexus_substrate/activations/arbor/
activation.rs

1use super::storage::{ArborConfig, ArborStorage};
2use super::types::{ArborEvent, Handle, NodeId, TreeId, TreeSkeleton};
3use crate::plexus::{HubContext, NoParent, PlexusStreamItem};
4use async_stream::stream;
5use futures::{Stream, StreamExt};
6use plexus_macros::hub_methods;
7use serde_json::Value;
8use std::marker::PhantomData;
9use std::sync::{Arc, OnceLock};
10
11/// Arbor activation - manages conversation trees
12///
13/// Generic over `P: HubContext` to support parent context injection for
14/// resolving handles when rendering trees.
15#[derive(Clone)]
16pub struct Arbor<P: HubContext = NoParent> {
17    storage: Arc<ArborStorage>,
18    /// Hub reference for resolving handles when rendering trees
19    hub: Arc<OnceLock<P>>,
20    _phantom: PhantomData<P>,
21}
22
23impl<P: HubContext> Arbor<P> {
24    /// Create a new Arbor activation with its own storage and specific context type
25    pub async fn with_context_type(config: ArborConfig) -> Result<Self, String> {
26        let storage = ArborStorage::new(config)
27            .await
28            .map_err(|e| format!("Failed to initialize Arbor storage: {}", e.message))?;
29
30        Ok(Self {
31            storage: Arc::new(storage),
32            hub: Arc::new(OnceLock::new()),
33            _phantom: PhantomData,
34        })
35    }
36
37    /// Create an Arbor activation with a shared storage instance
38    pub fn with_storage(storage: Arc<ArborStorage>) -> Self {
39        Self {
40            storage,
41            hub: Arc::new(OnceLock::new()),
42            _phantom: PhantomData,
43        }
44    }
45
46    /// Get the underlying storage (for sharing with other activations)
47    pub fn storage(&self) -> Arc<ArborStorage> {
48        self.storage.clone()
49    }
50
51    /// Inject parent context for resolving handles
52    ///
53    /// Called during hub construction (e.g., via Arc::new_cyclic for DynamicHub).
54    pub fn inject_parent(&self, parent: P) {
55        let _ = self.hub.set(parent);
56    }
57
58    /// Check if parent context has been injected
59    pub fn has_parent(&self) -> bool {
60        self.hub.get().is_some()
61    }
62
63    /// Get a reference to the parent context
64    pub fn parent(&self) -> Option<&P> {
65        self.hub.get()
66    }
67}
68
69/// Convenience constructor for Arbor with NoParent (standalone/testing)
70impl Arbor<NoParent> {
71    pub async fn new(config: ArborConfig) -> Result<Self, String> {
72        Self::with_context_type(config).await
73    }
74}
75
76#[hub_methods(
77    namespace = "arbor",
78    version = "1.0.0",
79    description = "Manage conversation trees with context tracking"
80)]
81impl<P: HubContext> Arbor<P> {
82    /// Create a new conversation tree
83    #[plexus_macros::hub_method(params(
84        metadata = "Optional tree-level metadata (name, description, etc.)",
85        owner_id = "Owner identifier (default: 'system')"
86    ))]
87    async fn tree_create(
88        &self,
89        metadata: Option<Value>,
90        owner_id: String,
91    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
92        let storage = self.storage.clone();
93        stream! {
94            match storage.tree_create(metadata, &owner_id).await {
95                Ok(tree_id) => yield ArborEvent::TreeCreated { tree_id },
96                Err(e) => {
97                    eprintln!("Error creating tree: {}", e.message);
98                    yield ArborEvent::TreeCreated { tree_id: TreeId::nil() };
99                }
100            }
101        }
102    }
103
104    /// Retrieve a complete tree with all nodes
105    #[plexus_macros::hub_method(params(tree_id = "UUID of the tree to retrieve"))]
106    async fn tree_get(&self, tree_id: TreeId) -> impl Stream<Item = ArborEvent> + Send + 'static {
107        let storage = self.storage.clone();
108        stream! {
109            match storage.tree_get(&tree_id).await {
110                Ok(tree) => yield ArborEvent::TreeData { tree },
111                Err(e) => {
112                    eprintln!("Error getting tree: {}", e.message);
113                    yield ArborEvent::TreeList { tree_ids: vec![] };
114                }
115            }
116        }
117    }
118
119    /// Get lightweight tree structure without node data
120    #[plexus_macros::hub_method(params(tree_id = "UUID of the tree to retrieve"))]
121    async fn tree_get_skeleton(
122        &self,
123        tree_id: TreeId,
124    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
125        let storage = self.storage.clone();
126        stream! {
127            match storage.tree_get(&tree_id).await {
128                Ok(tree) => yield ArborEvent::TreeSkeleton { skeleton: TreeSkeleton::from(&tree) },
129                Err(e) => {
130                    eprintln!("Error getting tree skeleton: {}", e.message);
131                    yield ArborEvent::TreeList { tree_ids: vec![] };
132                }
133            }
134        }
135    }
136
137    /// List all active trees
138    #[plexus_macros::hub_method]
139    async fn tree_list(&self) -> impl Stream<Item = ArborEvent> + Send + 'static {
140        let storage = self.storage.clone();
141        stream! {
142            match storage.tree_list(false).await {
143                Ok(tree_ids) => yield ArborEvent::TreeList { tree_ids },
144                Err(e) => {
145                    eprintln!("Error listing trees: {}", e.message);
146                    yield ArborEvent::TreeList { tree_ids: vec![] };
147                }
148            }
149        }
150    }
151
152    /// Update tree metadata
153    #[plexus_macros::hub_method(params(
154        tree_id = "UUID of the tree to update",
155        metadata = "New metadata to set"
156    ))]
157    async fn tree_update_metadata(
158        &self,
159        tree_id: TreeId,
160        metadata: Value,
161    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
162        let storage = self.storage.clone();
163        stream! {
164            match storage.tree_update_metadata(&tree_id, metadata).await {
165                Ok(_) => yield ArborEvent::TreeUpdated { tree_id },
166                Err(e) => {
167                    eprintln!("Error updating tree metadata: {}", e.message);
168                    yield ArborEvent::TreeList { tree_ids: vec![] };
169                }
170            }
171        }
172    }
173
174    /// Claim ownership of a tree (increment reference count)
175    #[plexus_macros::hub_method(params(
176        tree_id = "UUID of the tree to claim",
177        owner_id = "Owner identifier",
178        count = "Number of references to add (default: 1)"
179    ))]
180    async fn tree_claim(
181        &self,
182        tree_id: TreeId,
183        owner_id: String,
184        count: i64,
185    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
186        let storage = self.storage.clone();
187        stream! {
188            match storage.tree_claim(&tree_id, &owner_id, count).await {
189                Ok(new_count) => yield ArborEvent::TreeClaimed { tree_id, owner_id, new_count },
190                Err(e) => {
191                    eprintln!("Error claiming tree: {}", e.message);
192                    yield ArborEvent::TreeList { tree_ids: vec![] };
193                }
194            }
195        }
196    }
197
198    /// Release ownership of a tree (decrement reference count)
199    #[plexus_macros::hub_method(params(
200        tree_id = "UUID of the tree to release",
201        owner_id = "Owner identifier",
202        count = "Number of references to remove (default: 1)"
203    ))]
204    async fn tree_release(
205        &self,
206        tree_id: TreeId,
207        owner_id: String,
208        count: i64,
209    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
210        let storage = self.storage.clone();
211        stream! {
212            match storage.tree_release(&tree_id, &owner_id, count).await {
213                Ok(new_count) => yield ArborEvent::TreeReleased { tree_id, owner_id, new_count },
214                Err(e) => {
215                    eprintln!("Error releasing tree: {}", e.message);
216                    yield ArborEvent::TreeList { tree_ids: vec![] };
217                }
218            }
219        }
220    }
221
222    /// List trees scheduled for deletion
223    #[plexus_macros::hub_method]
224    async fn tree_list_scheduled(&self) -> impl Stream<Item = ArborEvent> + Send + 'static {
225        let storage = self.storage.clone();
226        stream! {
227            match storage.tree_list(true).await {
228                Ok(tree_ids) => yield ArborEvent::TreesScheduled { tree_ids },
229                Err(e) => {
230                    eprintln!("Error listing scheduled trees: {}", e.message);
231                    yield ArborEvent::TreesScheduled { tree_ids: vec![] };
232                }
233            }
234        }
235    }
236
237    /// List archived trees
238    #[plexus_macros::hub_method]
239    async fn tree_list_archived(&self) -> impl Stream<Item = ArborEvent> + Send + 'static {
240        let storage = self.storage.clone();
241        stream! {
242            match storage.tree_list(true).await {
243                Ok(tree_ids) => yield ArborEvent::TreesArchived { tree_ids },
244                Err(e) => {
245                    eprintln!("Error listing archived trees: {}", e.message);
246                    yield ArborEvent::TreesArchived { tree_ids: vec![] };
247                }
248            }
249        }
250    }
251
252    /// Create a text node in a tree
253    #[plexus_macros::hub_method(params(
254        tree_id = "UUID of the tree",
255        parent = "Parent node ID (None for root-level)",
256        content = "Text content for the node",
257        metadata = "Optional node metadata"
258    ))]
259    async fn node_create_text(
260        &self,
261        tree_id: TreeId,
262        parent: Option<NodeId>,
263        content: String,
264        metadata: Option<Value>,
265    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
266        let storage = self.storage.clone();
267        stream! {
268            match storage.node_create_text(&tree_id, parent, content, metadata).await {
269                Ok(node_id) => yield ArborEvent::NodeCreated { tree_id, node_id, parent },
270                Err(e) => {
271                    eprintln!("Error creating text node: {}", e.message);
272                    yield ArborEvent::TreeList { tree_ids: vec![] };
273                }
274            }
275        }
276    }
277
278    /// Create an external node in a tree
279    #[plexus_macros::hub_method(params(
280        tree_id = "UUID of the tree",
281        parent = "Parent node ID (None for root-level)",
282        handle = "Handle to external data",
283        metadata = "Optional node metadata"
284    ))]
285    async fn node_create_external(
286        &self,
287        tree_id: TreeId,
288        parent: Option<NodeId>,
289        handle: Handle,
290        metadata: Option<Value>,
291    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
292        let storage = self.storage.clone();
293        stream! {
294            match storage.node_create_external(&tree_id, parent, handle, metadata).await {
295                Ok(node_id) => yield ArborEvent::NodeCreated { tree_id, node_id, parent },
296                Err(e) => {
297                    eprintln!("Error creating external node: {}", e.message);
298                    yield ArborEvent::TreeList { tree_ids: vec![] };
299                }
300            }
301        }
302    }
303
304    /// Get a node by ID
305    #[plexus_macros::hub_method(params(
306        tree_id = "UUID of the tree",
307        node_id = "UUID of the node"
308    ))]
309    async fn node_get(
310        &self,
311        tree_id: TreeId,
312        node_id: NodeId,
313    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
314        let storage = self.storage.clone();
315        stream! {
316            match storage.node_get(&tree_id, &node_id).await {
317                Ok(node) => yield ArborEvent::NodeData { tree_id, node },
318                Err(e) => {
319                    eprintln!("Error getting node: {}", e.message);
320                    yield ArborEvent::TreeList { tree_ids: vec![] };
321                }
322            }
323        }
324    }
325
326    /// Get the children of a node
327    #[plexus_macros::hub_method(params(
328        tree_id = "UUID of the tree",
329        node_id = "UUID of the node"
330    ))]
331    async fn node_get_children(
332        &self,
333        tree_id: TreeId,
334        node_id: NodeId,
335    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
336        let storage = self.storage.clone();
337        stream! {
338            match storage.node_get_children(&tree_id, &node_id).await {
339                Ok(children) => yield ArborEvent::NodeChildren { tree_id, node_id, children },
340                Err(e) => {
341                    eprintln!("Error getting node children: {}", e.message);
342                    yield ArborEvent::TreeList { tree_ids: vec![] };
343                }
344            }
345        }
346    }
347
348    /// Get the parent of a node
349    #[plexus_macros::hub_method(params(
350        tree_id = "UUID of the tree",
351        node_id = "UUID of the node"
352    ))]
353    async fn node_get_parent(
354        &self,
355        tree_id: TreeId,
356        node_id: NodeId,
357    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
358        let storage = self.storage.clone();
359        stream! {
360            match storage.node_get_parent(&tree_id, &node_id).await {
361                Ok(parent) => yield ArborEvent::NodeParent { tree_id, node_id, parent },
362                Err(e) => {
363                    eprintln!("Error getting node parent: {}", e.message);
364                    yield ArborEvent::TreeList { tree_ids: vec![] };
365                }
366            }
367        }
368    }
369
370    /// Get the path from root to a node
371    #[plexus_macros::hub_method(params(
372        tree_id = "UUID of the tree",
373        node_id = "UUID of the node"
374    ))]
375    async fn node_get_path(
376        &self,
377        tree_id: TreeId,
378        node_id: NodeId,
379    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
380        let storage = self.storage.clone();
381        stream! {
382            match storage.node_get_path(&tree_id, &node_id).await {
383                Ok(path) => yield ArborEvent::ContextPath { tree_id, path },
384                Err(e) => {
385                    eprintln!("Error getting node path: {}", e.message);
386                    yield ArborEvent::TreeList { tree_ids: vec![] };
387                }
388            }
389        }
390    }
391
392    /// List all leaf nodes in a tree
393    #[plexus_macros::hub_method(params(tree_id = "UUID of the tree"))]
394    async fn context_list_leaves(
395        &self,
396        tree_id: TreeId,
397    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
398        let storage = self.storage.clone();
399        stream! {
400            match storage.context_list_leaves(&tree_id).await {
401                Ok(leaves) => yield ArborEvent::ContextLeaves { tree_id, leaves },
402                Err(e) => {
403                    eprintln!("Error listing leaf nodes: {}", e.message);
404                    yield ArborEvent::TreeList { tree_ids: vec![] };
405                }
406            }
407        }
408    }
409
410    /// Get the full path data from root to a node
411    #[plexus_macros::hub_method(params(
412        tree_id = "UUID of the tree",
413        node_id = "UUID of the target node"
414    ))]
415    async fn context_get_path(
416        &self,
417        tree_id: TreeId,
418        node_id: NodeId,
419    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
420        let storage = self.storage.clone();
421        stream! {
422            match storage.context_get_path(&tree_id, &node_id).await {
423                Ok(nodes) => yield ArborEvent::ContextPathData { tree_id, nodes },
424                Err(e) => {
425                    eprintln!("Error getting context path: {}", e.message);
426                    yield ArborEvent::TreeList { tree_ids: vec![] };
427                }
428            }
429        }
430    }
431
432    /// Get all external handles in the path to a node
433    #[plexus_macros::hub_method(params(
434        tree_id = "UUID of the tree",
435        node_id = "UUID of the target node"
436    ))]
437    async fn context_get_handles(
438        &self,
439        tree_id: TreeId,
440        node_id: NodeId,
441    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
442        let storage = self.storage.clone();
443        stream! {
444            match storage.context_get_handles(&tree_id, &node_id).await {
445                Ok(handles) => yield ArborEvent::ContextHandles { tree_id, handles },
446                Err(e) => {
447                    eprintln!("Error getting context handles: {}", e.message);
448                    yield ArborEvent::TreeList { tree_ids: vec![] };
449                }
450            }
451        }
452    }
453
454    /// Render tree as text visualization
455    ///
456    /// If parent context is available, automatically resolves handles to show
457    /// actual content. Otherwise, shows handle references.
458    #[plexus_macros::hub_method(params(tree_id = "UUID of the tree to render"))]
459    async fn tree_render(
460        &self,
461        tree_id: TreeId,
462    ) -> impl Stream<Item = ArborEvent> + Send + 'static {
463        let storage = self.storage.clone();
464        let hub = self.hub.clone();
465
466        stream! {
467            match storage.tree_get(&tree_id).await {
468                Ok(tree) => {
469                    // Check if we have parent context for handle resolution
470                    let render = if let Some(parent) = hub.get() {
471                        // Resolve handles through parent context
472                        tree.render_resolved(|handle| {
473                            let parent = parent.clone();
474                            let handle = handle.clone();
475                            async move {
476                                resolve_handle_to_string(&parent, &handle).await
477                            }
478                        }).await
479                    } else {
480                        // No parent context - use simple render (shows handle references)
481                        tree.render()
482                    };
483                    yield ArborEvent::TreeRender { tree_id, render };
484                }
485                Err(e) => {
486                    eprintln!("Error rendering tree: {}", e.message);
487                    yield ArborEvent::TreeRender { tree_id, render: format!("Error: {}", e.message) };
488                }
489            }
490        }
491    }
492}
493
494/// Resolve a handle through HubContext and extract a display string
495async fn resolve_handle_to_string<P: HubContext>(parent: &P, handle: &Handle) -> String {
496    match parent.resolve_handle(handle).await {
497        Ok(mut stream) => {
498            // Collect the first data item from the stream
499            while let Some(item) = stream.next().await {
500                match item {
501                    PlexusStreamItem::Data { content, .. } => {
502                        // Try to extract a meaningful display string from the resolved content
503                        return extract_display_content(&content);
504                    }
505                    PlexusStreamItem::Error { message, .. } => {
506                        return format!("[error: {}]", message);
507                    }
508                    PlexusStreamItem::Done { .. } => break,
509                    _ => continue,
510                }
511            }
512            format!("[empty: {}]", handle)
513        }
514        Err(e) => {
515            format!("[unresolved: {} - {}]", handle.method, e)
516        }
517    }
518}
519
520/// Extract display content from resolved handle data
521fn extract_display_content(content: &Value) -> String {
522    // Try common patterns for resolved content
523
524    // Pattern 1: { "type": "message", "role": "...", "content": "..." }
525    if let Some(msg_content) = content.get("content").and_then(|v| v.as_str()) {
526        let role = content.get("role").and_then(|v| v.as_str()).unwrap_or("unknown");
527        let name = content.get("name").and_then(|v| v.as_str());
528
529        let truncated = if msg_content.len() > 60 {
530            format!("{}...", &msg_content[..57])
531        } else {
532            msg_content.to_string()
533        };
534
535        return if let Some(n) = name {
536            format!("[{}:{}] {}", role, n, truncated.replace('\n', "↵"))
537        } else {
538            format!("[{}] {}", role, truncated.replace('\n', "↵"))
539        };
540    }
541
542    // Pattern 2: { "type": "...", ... } - use type as label
543    if let Some(type_str) = content.get("type").and_then(|v| v.as_str()) {
544        return format!("[{}]", type_str);
545    }
546
547    // Fallback: show truncated JSON
548    let json_str = content.to_string();
549    if json_str.len() > 50 {
550        format!("{}...", &json_str[..47])
551    } else {
552        json_str
553    }
554}