1use super::storage::{ArborConfig, ArborStorage};
2use super::types::{ArborEvent, Handle, NodeId, TreeId, TreeSkeleton};
3use crate::plexus::{HubContext, NoParent, PlexusStreamItem};
4use async_stream::stream;
5use futures::{Stream, StreamExt};
6use plexus_macros::hub_methods;
7use serde_json::Value;
8use std::marker::PhantomData;
9use std::sync::{Arc, OnceLock};
10
11#[derive(Clone)]
16pub struct Arbor<P: HubContext = NoParent> {
17 storage: Arc<ArborStorage>,
18 hub: Arc<OnceLock<P>>,
20 _phantom: PhantomData<P>,
21}
22
23impl<P: HubContext> Arbor<P> {
24 pub async fn with_context_type(config: ArborConfig) -> Result<Self, String> {
26 let storage = ArborStorage::new(config)
27 .await
28 .map_err(|e| format!("Failed to initialize Arbor storage: {}", e.message))?;
29
30 Ok(Self {
31 storage: Arc::new(storage),
32 hub: Arc::new(OnceLock::new()),
33 _phantom: PhantomData,
34 })
35 }
36
37 pub fn with_storage(storage: Arc<ArborStorage>) -> Self {
39 Self {
40 storage,
41 hub: Arc::new(OnceLock::new()),
42 _phantom: PhantomData,
43 }
44 }
45
46 pub fn storage(&self) -> Arc<ArborStorage> {
48 self.storage.clone()
49 }
50
51 pub fn inject_parent(&self, parent: P) {
55 let _ = self.hub.set(parent);
56 }
57
58 pub fn has_parent(&self) -> bool {
60 self.hub.get().is_some()
61 }
62
63 pub fn parent(&self) -> Option<&P> {
65 self.hub.get()
66 }
67}
68
69impl Arbor<NoParent> {
71 pub async fn new(config: ArborConfig) -> Result<Self, String> {
72 Self::with_context_type(config).await
73 }
74}
75
76#[hub_methods(
77 namespace = "arbor",
78 version = "1.0.0",
79 description = "Manage conversation trees with context tracking"
80)]
81impl<P: HubContext> Arbor<P> {
82 #[plexus_macros::hub_method(params(
84 metadata = "Optional tree-level metadata (name, description, etc.)",
85 owner_id = "Owner identifier (default: 'system')"
86 ))]
87 async fn tree_create(
88 &self,
89 metadata: Option<Value>,
90 owner_id: String,
91 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
92 let storage = self.storage.clone();
93 stream! {
94 match storage.tree_create(metadata, &owner_id).await {
95 Ok(tree_id) => yield ArborEvent::TreeCreated { tree_id },
96 Err(e) => {
97 eprintln!("Error creating tree: {}", e.message);
98 yield ArborEvent::TreeCreated { tree_id: TreeId::nil() };
99 }
100 }
101 }
102 }
103
104 #[plexus_macros::hub_method(params(tree_id = "UUID of the tree to retrieve"))]
106 async fn tree_get(&self, tree_id: TreeId) -> impl Stream<Item = ArborEvent> + Send + 'static {
107 let storage = self.storage.clone();
108 stream! {
109 match storage.tree_get(&tree_id).await {
110 Ok(tree) => yield ArborEvent::TreeData { tree },
111 Err(e) => {
112 eprintln!("Error getting tree: {}", e.message);
113 yield ArborEvent::TreeList { tree_ids: vec![] };
114 }
115 }
116 }
117 }
118
119 #[plexus_macros::hub_method(params(tree_id = "UUID of the tree to retrieve"))]
121 async fn tree_get_skeleton(
122 &self,
123 tree_id: TreeId,
124 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
125 let storage = self.storage.clone();
126 stream! {
127 match storage.tree_get(&tree_id).await {
128 Ok(tree) => yield ArborEvent::TreeSkeleton { skeleton: TreeSkeleton::from(&tree) },
129 Err(e) => {
130 eprintln!("Error getting tree skeleton: {}", e.message);
131 yield ArborEvent::TreeList { tree_ids: vec![] };
132 }
133 }
134 }
135 }
136
137 #[plexus_macros::hub_method]
139 async fn tree_list(&self) -> impl Stream<Item = ArborEvent> + Send + 'static {
140 let storage = self.storage.clone();
141 stream! {
142 match storage.tree_list(false).await {
143 Ok(tree_ids) => yield ArborEvent::TreeList { tree_ids },
144 Err(e) => {
145 eprintln!("Error listing trees: {}", e.message);
146 yield ArborEvent::TreeList { tree_ids: vec![] };
147 }
148 }
149 }
150 }
151
152 #[plexus_macros::hub_method(params(
154 tree_id = "UUID of the tree to update",
155 metadata = "New metadata to set"
156 ))]
157 async fn tree_update_metadata(
158 &self,
159 tree_id: TreeId,
160 metadata: Value,
161 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
162 let storage = self.storage.clone();
163 stream! {
164 match storage.tree_update_metadata(&tree_id, metadata).await {
165 Ok(_) => yield ArborEvent::TreeUpdated { tree_id },
166 Err(e) => {
167 eprintln!("Error updating tree metadata: {}", e.message);
168 yield ArborEvent::TreeList { tree_ids: vec![] };
169 }
170 }
171 }
172 }
173
174 #[plexus_macros::hub_method(params(
176 tree_id = "UUID of the tree to claim",
177 owner_id = "Owner identifier",
178 count = "Number of references to add (default: 1)"
179 ))]
180 async fn tree_claim(
181 &self,
182 tree_id: TreeId,
183 owner_id: String,
184 count: i64,
185 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
186 let storage = self.storage.clone();
187 stream! {
188 match storage.tree_claim(&tree_id, &owner_id, count).await {
189 Ok(new_count) => yield ArborEvent::TreeClaimed { tree_id, owner_id, new_count },
190 Err(e) => {
191 eprintln!("Error claiming tree: {}", e.message);
192 yield ArborEvent::TreeList { tree_ids: vec![] };
193 }
194 }
195 }
196 }
197
198 #[plexus_macros::hub_method(params(
200 tree_id = "UUID of the tree to release",
201 owner_id = "Owner identifier",
202 count = "Number of references to remove (default: 1)"
203 ))]
204 async fn tree_release(
205 &self,
206 tree_id: TreeId,
207 owner_id: String,
208 count: i64,
209 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
210 let storage = self.storage.clone();
211 stream! {
212 match storage.tree_release(&tree_id, &owner_id, count).await {
213 Ok(new_count) => yield ArborEvent::TreeReleased { tree_id, owner_id, new_count },
214 Err(e) => {
215 eprintln!("Error releasing tree: {}", e.message);
216 yield ArborEvent::TreeList { tree_ids: vec![] };
217 }
218 }
219 }
220 }
221
222 #[plexus_macros::hub_method]
224 async fn tree_list_scheduled(&self) -> impl Stream<Item = ArborEvent> + Send + 'static {
225 let storage = self.storage.clone();
226 stream! {
227 match storage.tree_list(true).await {
228 Ok(tree_ids) => yield ArborEvent::TreesScheduled { tree_ids },
229 Err(e) => {
230 eprintln!("Error listing scheduled trees: {}", e.message);
231 yield ArborEvent::TreesScheduled { tree_ids: vec![] };
232 }
233 }
234 }
235 }
236
237 #[plexus_macros::hub_method]
239 async fn tree_list_archived(&self) -> impl Stream<Item = ArborEvent> + Send + 'static {
240 let storage = self.storage.clone();
241 stream! {
242 match storage.tree_list(true).await {
243 Ok(tree_ids) => yield ArborEvent::TreesArchived { tree_ids },
244 Err(e) => {
245 eprintln!("Error listing archived trees: {}", e.message);
246 yield ArborEvent::TreesArchived { tree_ids: vec![] };
247 }
248 }
249 }
250 }
251
252 #[plexus_macros::hub_method(params(
254 tree_id = "UUID of the tree",
255 parent = "Parent node ID (None for root-level)",
256 content = "Text content for the node",
257 metadata = "Optional node metadata"
258 ))]
259 async fn node_create_text(
260 &self,
261 tree_id: TreeId,
262 parent: Option<NodeId>,
263 content: String,
264 metadata: Option<Value>,
265 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
266 let storage = self.storage.clone();
267 stream! {
268 match storage.node_create_text(&tree_id, parent, content, metadata).await {
269 Ok(node_id) => yield ArborEvent::NodeCreated { tree_id, node_id, parent },
270 Err(e) => {
271 eprintln!("Error creating text node: {}", e.message);
272 yield ArborEvent::TreeList { tree_ids: vec![] };
273 }
274 }
275 }
276 }
277
278 #[plexus_macros::hub_method(params(
280 tree_id = "UUID of the tree",
281 parent = "Parent node ID (None for root-level)",
282 handle = "Handle to external data",
283 metadata = "Optional node metadata"
284 ))]
285 async fn node_create_external(
286 &self,
287 tree_id: TreeId,
288 parent: Option<NodeId>,
289 handle: Handle,
290 metadata: Option<Value>,
291 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
292 let storage = self.storage.clone();
293 stream! {
294 match storage.node_create_external(&tree_id, parent, handle, metadata).await {
295 Ok(node_id) => yield ArborEvent::NodeCreated { tree_id, node_id, parent },
296 Err(e) => {
297 eprintln!("Error creating external node: {}", e.message);
298 yield ArborEvent::TreeList { tree_ids: vec![] };
299 }
300 }
301 }
302 }
303
304 #[plexus_macros::hub_method(params(
306 tree_id = "UUID of the tree",
307 node_id = "UUID of the node"
308 ))]
309 async fn node_get(
310 &self,
311 tree_id: TreeId,
312 node_id: NodeId,
313 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
314 let storage = self.storage.clone();
315 stream! {
316 match storage.node_get(&tree_id, &node_id).await {
317 Ok(node) => yield ArborEvent::NodeData { tree_id, node },
318 Err(e) => {
319 eprintln!("Error getting node: {}", e.message);
320 yield ArborEvent::TreeList { tree_ids: vec![] };
321 }
322 }
323 }
324 }
325
326 #[plexus_macros::hub_method(params(
328 tree_id = "UUID of the tree",
329 node_id = "UUID of the node"
330 ))]
331 async fn node_get_children(
332 &self,
333 tree_id: TreeId,
334 node_id: NodeId,
335 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
336 let storage = self.storage.clone();
337 stream! {
338 match storage.node_get_children(&tree_id, &node_id).await {
339 Ok(children) => yield ArborEvent::NodeChildren { tree_id, node_id, children },
340 Err(e) => {
341 eprintln!("Error getting node children: {}", e.message);
342 yield ArborEvent::TreeList { tree_ids: vec![] };
343 }
344 }
345 }
346 }
347
348 #[plexus_macros::hub_method(params(
350 tree_id = "UUID of the tree",
351 node_id = "UUID of the node"
352 ))]
353 async fn node_get_parent(
354 &self,
355 tree_id: TreeId,
356 node_id: NodeId,
357 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
358 let storage = self.storage.clone();
359 stream! {
360 match storage.node_get_parent(&tree_id, &node_id).await {
361 Ok(parent) => yield ArborEvent::NodeParent { tree_id, node_id, parent },
362 Err(e) => {
363 eprintln!("Error getting node parent: {}", e.message);
364 yield ArborEvent::TreeList { tree_ids: vec![] };
365 }
366 }
367 }
368 }
369
370 #[plexus_macros::hub_method(params(
372 tree_id = "UUID of the tree",
373 node_id = "UUID of the node"
374 ))]
375 async fn node_get_path(
376 &self,
377 tree_id: TreeId,
378 node_id: NodeId,
379 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
380 let storage = self.storage.clone();
381 stream! {
382 match storage.node_get_path(&tree_id, &node_id).await {
383 Ok(path) => yield ArborEvent::ContextPath { tree_id, path },
384 Err(e) => {
385 eprintln!("Error getting node path: {}", e.message);
386 yield ArborEvent::TreeList { tree_ids: vec![] };
387 }
388 }
389 }
390 }
391
392 #[plexus_macros::hub_method(params(tree_id = "UUID of the tree"))]
394 async fn context_list_leaves(
395 &self,
396 tree_id: TreeId,
397 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
398 let storage = self.storage.clone();
399 stream! {
400 match storage.context_list_leaves(&tree_id).await {
401 Ok(leaves) => yield ArborEvent::ContextLeaves { tree_id, leaves },
402 Err(e) => {
403 eprintln!("Error listing leaf nodes: {}", e.message);
404 yield ArborEvent::TreeList { tree_ids: vec![] };
405 }
406 }
407 }
408 }
409
410 #[plexus_macros::hub_method(params(
412 tree_id = "UUID of the tree",
413 node_id = "UUID of the target node"
414 ))]
415 async fn context_get_path(
416 &self,
417 tree_id: TreeId,
418 node_id: NodeId,
419 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
420 let storage = self.storage.clone();
421 stream! {
422 match storage.context_get_path(&tree_id, &node_id).await {
423 Ok(nodes) => yield ArborEvent::ContextPathData { tree_id, nodes },
424 Err(e) => {
425 eprintln!("Error getting context path: {}", e.message);
426 yield ArborEvent::TreeList { tree_ids: vec![] };
427 }
428 }
429 }
430 }
431
432 #[plexus_macros::hub_method(params(
434 tree_id = "UUID of the tree",
435 node_id = "UUID of the target node"
436 ))]
437 async fn context_get_handles(
438 &self,
439 tree_id: TreeId,
440 node_id: NodeId,
441 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
442 let storage = self.storage.clone();
443 stream! {
444 match storage.context_get_handles(&tree_id, &node_id).await {
445 Ok(handles) => yield ArborEvent::ContextHandles { tree_id, handles },
446 Err(e) => {
447 eprintln!("Error getting context handles: {}", e.message);
448 yield ArborEvent::TreeList { tree_ids: vec![] };
449 }
450 }
451 }
452 }
453
454 #[plexus_macros::hub_method(params(tree_id = "UUID of the tree to render"))]
459 async fn tree_render(
460 &self,
461 tree_id: TreeId,
462 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
463 let storage = self.storage.clone();
464 let hub = self.hub.clone();
465
466 stream! {
467 match storage.tree_get(&tree_id).await {
468 Ok(tree) => {
469 let render = if let Some(parent) = hub.get() {
471 tree.render_resolved(|handle| {
473 let parent = parent.clone();
474 let handle = handle.clone();
475 async move {
476 resolve_handle_to_string(&parent, &handle).await
477 }
478 }).await
479 } else {
480 tree.render()
482 };
483 yield ArborEvent::TreeRender { tree_id, render };
484 }
485 Err(e) => {
486 eprintln!("Error rendering tree: {}", e.message);
487 yield ArborEvent::TreeRender { tree_id, render: format!("Error: {}", e.message) };
488 }
489 }
490 }
491 }
492
493 async fn view_create(
495 &self,
496 source_tree_id: TreeId,
497 owner_id: String,
498 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
499 let storage = self.storage.clone();
500
501 stream! {
502 match storage.view_create(&source_tree_id, &owner_id).await {
503 Ok(view_tree_id) => {
504 yield ArborEvent::ViewCreated {
505 view_tree_id,
506 source_tree_id,
507 };
508 }
509 Err(e) => {
510 eprintln!("Error creating view: {}", e.message);
511 yield ArborEvent::Err { message: e.to_string() };
512 }
513 }
514 }
515 }
516
517 async fn view_add_range(
519 &self,
520 view_tree_id: TreeId,
521 parent_node: NodeId,
522 range_spec: crate::activations::arbor::RangeSpec,
523 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
524 let storage = self.storage.clone();
525
526 stream! {
527 match storage.view_add_range(&view_tree_id, &parent_node, range_spec).await {
528 Ok(range_node_id) => {
529 yield ArborEvent::RangeAdded {
530 view_tree_id,
531 range_node_id,
532 };
533 }
534 Err(e) => {
535 eprintln!("Error adding range: {}", e.message);
536 yield ArborEvent::Err { message: e.to_string() };
537 }
538 }
539 }
540 }
541
542 async fn view_detect_text_runs(
544 &self,
545 tree_id: TreeId,
546 min_length: u32,
547 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
548 let storage = self.storage.clone();
549
550 stream! {
551 match storage.view_detect_text_runs(&tree_id, min_length as usize).await {
552 Ok(runs) => {
553 yield ArborEvent::TextRunsDetected {
554 tree_id,
555 runs,
556 };
557 }
558 Err(e) => {
559 eprintln!("Error detecting text runs: {}", e.message);
560 yield ArborEvent::Err { message: e.to_string() };
561 }
562 }
563 }
564 }
565
566 async fn view_collapse_text_runs(
568 &self,
569 source_tree_id: TreeId,
570 min_run_length: u32,
571 owner_id: String,
572 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
573 let storage = self.storage.clone();
574
575 stream! {
576 match storage.view_collapse_text_runs(&source_tree_id, min_run_length as usize, &owner_id).await {
577 Ok((view_tree_id, runs)) => {
578 yield ArborEvent::ViewCollapsed {
579 view_tree_id,
580 source_tree_id,
581 collapsed_runs: runs,
582 };
583 }
584 Err(e) => {
585 eprintln!("Error collapsing text runs: {}", e.message);
586 yield ArborEvent::Err { message: e.to_string() };
587 }
588 }
589 }
590 }
591
592 async fn range_get(
594 &self,
595 tree_id: TreeId,
596 start_node: NodeId,
597 end_node: NodeId,
598 collapse_type: crate::activations::arbor::CollapseType,
599 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
600 let storage = self.storage.clone();
601
602 stream! {
603 match storage.range_get(&tree_id, &start_node, &end_node, &collapse_type).await {
604 Ok(content) => {
605 yield ArborEvent::RangeContent {
606 tree_id,
607 start_node,
608 end_node,
609 content,
610 };
611 }
612 Err(e) => {
613 eprintln!("Error getting range: {}", e.message);
614 yield ArborEvent::Err { message: e.to_string() };
615 }
616 }
617 }
618 }
619}
620
621async fn resolve_handle_to_string<P: HubContext>(parent: &P, handle: &Handle) -> String {
623 match parent.resolve_handle(handle).await {
624 Ok(mut stream) => {
625 while let Some(item) = stream.next().await {
627 match item {
628 PlexusStreamItem::Data { content, .. } => {
629 return extract_display_content(&content);
631 }
632 PlexusStreamItem::Error { message, .. } => {
633 return format!("[error: {}]", message);
634 }
635 PlexusStreamItem::Done { .. } => break,
636 _ => continue,
637 }
638 }
639 format!("[empty: {}]", handle)
640 }
641 Err(e) => {
642 format!("[unresolved: {} - {}]", handle.method, e)
643 }
644 }
645}
646
647fn extract_display_content(content: &Value) -> String {
649 if let Some(msg_content) = content.get("content").and_then(|v| v.as_str()) {
653 let role = content.get("role").and_then(|v| v.as_str()).unwrap_or("unknown");
654 let name = content.get("name").and_then(|v| v.as_str());
655
656 let truncated = if msg_content.len() > 60 {
657 format!("{}...", &msg_content[..57])
658 } else {
659 msg_content.to_string()
660 };
661
662 return if let Some(n) = name {
663 format!("[{}:{}] {}", role, n, truncated.replace('\n', "↵"))
664 } else {
665 format!("[{}] {}", role, truncated.replace('\n', "↵"))
666 };
667 }
668
669 if let Some(type_str) = content.get("type").and_then(|v| v.as_str()) {
671 return format!("[{}]", type_str);
672 }
673
674 let json_str = content.to_string();
676 if json_str.len() > 50 {
677 format!("{}...", &json_str[..47])
678 } else {
679 json_str
680 }
681}