1use super::storage::{ArborConfig, ArborStorage};
2use super::types::{ArborEvent, Handle, NodeId, TreeId, TreeSkeleton};
3use crate::plexus::{HubContext, NoParent, PlexusStreamItem};
4use async_stream::stream;
5use futures::{Stream, StreamExt};
6use plexus_macros::hub_methods;
7use serde_json::Value;
8use std::marker::PhantomData;
9use std::sync::{Arc, OnceLock};
10
11#[derive(Clone)]
16pub struct Arbor<P: HubContext = NoParent> {
17 storage: Arc<ArborStorage>,
18 hub: Arc<OnceLock<P>>,
20 _phantom: PhantomData<P>,
21}
22
23impl<P: HubContext> Arbor<P> {
24 pub async fn with_context_type(config: ArborConfig) -> Result<Self, String> {
26 let storage = ArborStorage::new(config)
27 .await
28 .map_err(|e| format!("Failed to initialize Arbor storage: {}", e.message))?;
29
30 Ok(Self {
31 storage: Arc::new(storage),
32 hub: Arc::new(OnceLock::new()),
33 _phantom: PhantomData,
34 })
35 }
36
37 pub fn with_storage(storage: Arc<ArborStorage>) -> Self {
39 Self {
40 storage,
41 hub: Arc::new(OnceLock::new()),
42 _phantom: PhantomData,
43 }
44 }
45
46 pub fn storage(&self) -> Arc<ArborStorage> {
48 self.storage.clone()
49 }
50
51 pub fn inject_parent(&self, parent: P) {
55 let _ = self.hub.set(parent);
56 }
57
58 pub fn has_parent(&self) -> bool {
60 self.hub.get().is_some()
61 }
62
63 pub fn parent(&self) -> Option<&P> {
65 self.hub.get()
66 }
67}
68
69impl Arbor<NoParent> {
71 pub async fn new(config: ArborConfig) -> Result<Self, String> {
72 Self::with_context_type(config).await
73 }
74}
75
76#[hub_methods(
77 namespace = "arbor",
78 version = "1.0.0",
79 description = "Manage conversation trees with context tracking"
80)]
81impl<P: HubContext> Arbor<P> {
82 #[plexus_macros::hub_method(params(
84 metadata = "Optional tree-level metadata (name, description, etc.)",
85 owner_id = "Owner identifier (default: 'system')"
86 ))]
87 async fn tree_create(
88 &self,
89 metadata: Option<Value>,
90 owner_id: String,
91 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
92 let storage = self.storage.clone();
93 stream! {
94 match storage.tree_create(metadata, &owner_id).await {
95 Ok(tree_id) => yield ArborEvent::TreeCreated { tree_id },
96 Err(e) => {
97 eprintln!("Error creating tree: {}", e.message);
98 yield ArborEvent::TreeCreated { tree_id: TreeId::nil() };
99 }
100 }
101 }
102 }
103
104 #[plexus_macros::hub_method(params(tree_id = "UUID of the tree to retrieve"))]
106 async fn tree_get(&self, tree_id: TreeId) -> impl Stream<Item = ArborEvent> + Send + 'static {
107 let storage = self.storage.clone();
108 stream! {
109 match storage.tree_get(&tree_id).await {
110 Ok(tree) => yield ArborEvent::TreeData { tree },
111 Err(e) => {
112 eprintln!("Error getting tree: {}", e.message);
113 yield ArborEvent::TreeList { tree_ids: vec![] };
114 }
115 }
116 }
117 }
118
119 #[plexus_macros::hub_method(params(tree_id = "UUID of the tree to retrieve"))]
121 async fn tree_get_skeleton(
122 &self,
123 tree_id: TreeId,
124 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
125 let storage = self.storage.clone();
126 stream! {
127 match storage.tree_get(&tree_id).await {
128 Ok(tree) => yield ArborEvent::TreeSkeleton { skeleton: TreeSkeleton::from(&tree) },
129 Err(e) => {
130 eprintln!("Error getting tree skeleton: {}", e.message);
131 yield ArborEvent::TreeList { tree_ids: vec![] };
132 }
133 }
134 }
135 }
136
137 #[plexus_macros::hub_method]
139 async fn tree_list(&self) -> impl Stream<Item = ArborEvent> + Send + 'static {
140 let storage = self.storage.clone();
141 stream! {
142 match storage.tree_list(false).await {
143 Ok(tree_ids) => yield ArborEvent::TreeList { tree_ids },
144 Err(e) => {
145 eprintln!("Error listing trees: {}", e.message);
146 yield ArborEvent::TreeList { tree_ids: vec![] };
147 }
148 }
149 }
150 }
151
152 #[plexus_macros::hub_method(params(
154 tree_id = "UUID of the tree to update",
155 metadata = "New metadata to set"
156 ))]
157 async fn tree_update_metadata(
158 &self,
159 tree_id: TreeId,
160 metadata: Value,
161 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
162 let storage = self.storage.clone();
163 stream! {
164 match storage.tree_update_metadata(&tree_id, metadata).await {
165 Ok(_) => yield ArborEvent::TreeUpdated { tree_id },
166 Err(e) => {
167 eprintln!("Error updating tree metadata: {}", e.message);
168 yield ArborEvent::TreeList { tree_ids: vec![] };
169 }
170 }
171 }
172 }
173
174 #[plexus_macros::hub_method(params(
176 tree_id = "UUID of the tree to claim",
177 owner_id = "Owner identifier",
178 count = "Number of references to add (default: 1)"
179 ))]
180 async fn tree_claim(
181 &self,
182 tree_id: TreeId,
183 owner_id: String,
184 count: i64,
185 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
186 let storage = self.storage.clone();
187 stream! {
188 match storage.tree_claim(&tree_id, &owner_id, count).await {
189 Ok(new_count) => yield ArborEvent::TreeClaimed { tree_id, owner_id, new_count },
190 Err(e) => {
191 eprintln!("Error claiming tree: {}", e.message);
192 yield ArborEvent::TreeList { tree_ids: vec![] };
193 }
194 }
195 }
196 }
197
198 #[plexus_macros::hub_method(params(
200 tree_id = "UUID of the tree to release",
201 owner_id = "Owner identifier",
202 count = "Number of references to remove (default: 1)"
203 ))]
204 async fn tree_release(
205 &self,
206 tree_id: TreeId,
207 owner_id: String,
208 count: i64,
209 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
210 let storage = self.storage.clone();
211 stream! {
212 match storage.tree_release(&tree_id, &owner_id, count).await {
213 Ok(new_count) => yield ArborEvent::TreeReleased { tree_id, owner_id, new_count },
214 Err(e) => {
215 eprintln!("Error releasing tree: {}", e.message);
216 yield ArborEvent::TreeList { tree_ids: vec![] };
217 }
218 }
219 }
220 }
221
222 #[plexus_macros::hub_method]
224 async fn tree_list_scheduled(&self) -> impl Stream<Item = ArborEvent> + Send + 'static {
225 let storage = self.storage.clone();
226 stream! {
227 match storage.tree_list(true).await {
228 Ok(tree_ids) => yield ArborEvent::TreesScheduled { tree_ids },
229 Err(e) => {
230 eprintln!("Error listing scheduled trees: {}", e.message);
231 yield ArborEvent::TreesScheduled { tree_ids: vec![] };
232 }
233 }
234 }
235 }
236
237 #[plexus_macros::hub_method]
239 async fn tree_list_archived(&self) -> impl Stream<Item = ArborEvent> + Send + 'static {
240 let storage = self.storage.clone();
241 stream! {
242 match storage.tree_list(true).await {
243 Ok(tree_ids) => yield ArborEvent::TreesArchived { tree_ids },
244 Err(e) => {
245 eprintln!("Error listing archived trees: {}", e.message);
246 yield ArborEvent::TreesArchived { tree_ids: vec![] };
247 }
248 }
249 }
250 }
251
252 #[plexus_macros::hub_method(params(
254 tree_id = "UUID of the tree",
255 parent = "Parent node ID (None for root-level)",
256 content = "Text content for the node",
257 metadata = "Optional node metadata"
258 ))]
259 async fn node_create_text(
260 &self,
261 tree_id: TreeId,
262 parent: Option<NodeId>,
263 content: String,
264 metadata: Option<Value>,
265 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
266 let storage = self.storage.clone();
267 stream! {
268 match storage.node_create_text(&tree_id, parent, content, metadata).await {
269 Ok(node_id) => yield ArborEvent::NodeCreated { tree_id, node_id, parent },
270 Err(e) => {
271 eprintln!("Error creating text node: {}", e.message);
272 yield ArborEvent::TreeList { tree_ids: vec![] };
273 }
274 }
275 }
276 }
277
278 #[plexus_macros::hub_method(params(
280 tree_id = "UUID of the tree",
281 parent = "Parent node ID (None for root-level)",
282 handle = "Handle to external data",
283 metadata = "Optional node metadata"
284 ))]
285 async fn node_create_external(
286 &self,
287 tree_id: TreeId,
288 parent: Option<NodeId>,
289 handle: Handle,
290 metadata: Option<Value>,
291 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
292 let storage = self.storage.clone();
293 stream! {
294 match storage.node_create_external(&tree_id, parent, handle, metadata).await {
295 Ok(node_id) => yield ArborEvent::NodeCreated { tree_id, node_id, parent },
296 Err(e) => {
297 eprintln!("Error creating external node: {}", e.message);
298 yield ArborEvent::TreeList { tree_ids: vec![] };
299 }
300 }
301 }
302 }
303
304 #[plexus_macros::hub_method(params(
306 tree_id = "UUID of the tree",
307 node_id = "UUID of the node"
308 ))]
309 async fn node_get(
310 &self,
311 tree_id: TreeId,
312 node_id: NodeId,
313 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
314 let storage = self.storage.clone();
315 stream! {
316 match storage.node_get(&tree_id, &node_id).await {
317 Ok(node) => yield ArborEvent::NodeData { tree_id, node },
318 Err(e) => {
319 eprintln!("Error getting node: {}", e.message);
320 yield ArborEvent::TreeList { tree_ids: vec![] };
321 }
322 }
323 }
324 }
325
326 #[plexus_macros::hub_method(params(
328 tree_id = "UUID of the tree",
329 node_id = "UUID of the node"
330 ))]
331 async fn node_get_children(
332 &self,
333 tree_id: TreeId,
334 node_id: NodeId,
335 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
336 let storage = self.storage.clone();
337 stream! {
338 match storage.node_get_children(&tree_id, &node_id).await {
339 Ok(children) => yield ArborEvent::NodeChildren { tree_id, node_id, children },
340 Err(e) => {
341 eprintln!("Error getting node children: {}", e.message);
342 yield ArborEvent::TreeList { tree_ids: vec![] };
343 }
344 }
345 }
346 }
347
348 #[plexus_macros::hub_method(params(
350 tree_id = "UUID of the tree",
351 node_id = "UUID of the node"
352 ))]
353 async fn node_get_parent(
354 &self,
355 tree_id: TreeId,
356 node_id: NodeId,
357 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
358 let storage = self.storage.clone();
359 stream! {
360 match storage.node_get_parent(&tree_id, &node_id).await {
361 Ok(parent) => yield ArborEvent::NodeParent { tree_id, node_id, parent },
362 Err(e) => {
363 eprintln!("Error getting node parent: {}", e.message);
364 yield ArborEvent::TreeList { tree_ids: vec![] };
365 }
366 }
367 }
368 }
369
370 #[plexus_macros::hub_method(params(
372 tree_id = "UUID of the tree",
373 node_id = "UUID of the node"
374 ))]
375 async fn node_get_path(
376 &self,
377 tree_id: TreeId,
378 node_id: NodeId,
379 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
380 let storage = self.storage.clone();
381 stream! {
382 match storage.node_get_path(&tree_id, &node_id).await {
383 Ok(path) => yield ArborEvent::ContextPath { tree_id, path },
384 Err(e) => {
385 eprintln!("Error getting node path: {}", e.message);
386 yield ArborEvent::TreeList { tree_ids: vec![] };
387 }
388 }
389 }
390 }
391
392 #[plexus_macros::hub_method(params(tree_id = "UUID of the tree"))]
394 async fn context_list_leaves(
395 &self,
396 tree_id: TreeId,
397 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
398 let storage = self.storage.clone();
399 stream! {
400 match storage.context_list_leaves(&tree_id).await {
401 Ok(leaves) => yield ArborEvent::ContextLeaves { tree_id, leaves },
402 Err(e) => {
403 eprintln!("Error listing leaf nodes: {}", e.message);
404 yield ArborEvent::TreeList { tree_ids: vec![] };
405 }
406 }
407 }
408 }
409
410 #[plexus_macros::hub_method(params(
412 tree_id = "UUID of the tree",
413 node_id = "UUID of the target node"
414 ))]
415 async fn context_get_path(
416 &self,
417 tree_id: TreeId,
418 node_id: NodeId,
419 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
420 let storage = self.storage.clone();
421 stream! {
422 match storage.context_get_path(&tree_id, &node_id).await {
423 Ok(nodes) => yield ArborEvent::ContextPathData { tree_id, nodes },
424 Err(e) => {
425 eprintln!("Error getting context path: {}", e.message);
426 yield ArborEvent::TreeList { tree_ids: vec![] };
427 }
428 }
429 }
430 }
431
432 #[plexus_macros::hub_method(params(
434 tree_id = "UUID of the tree",
435 node_id = "UUID of the target node"
436 ))]
437 async fn context_get_handles(
438 &self,
439 tree_id: TreeId,
440 node_id: NodeId,
441 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
442 let storage = self.storage.clone();
443 stream! {
444 match storage.context_get_handles(&tree_id, &node_id).await {
445 Ok(handles) => yield ArborEvent::ContextHandles { tree_id, handles },
446 Err(e) => {
447 eprintln!("Error getting context handles: {}", e.message);
448 yield ArborEvent::TreeList { tree_ids: vec![] };
449 }
450 }
451 }
452 }
453
454 #[plexus_macros::hub_method(params(tree_id = "UUID of the tree to render"))]
459 async fn tree_render(
460 &self,
461 tree_id: TreeId,
462 ) -> impl Stream<Item = ArborEvent> + Send + 'static {
463 let storage = self.storage.clone();
464 let hub = self.hub.clone();
465
466 stream! {
467 match storage.tree_get(&tree_id).await {
468 Ok(tree) => {
469 let render = if let Some(parent) = hub.get() {
471 tree.render_resolved(|handle| {
473 let parent = parent.clone();
474 let handle = handle.clone();
475 async move {
476 resolve_handle_to_string(&parent, &handle).await
477 }
478 }).await
479 } else {
480 tree.render()
482 };
483 yield ArborEvent::TreeRender { tree_id, render };
484 }
485 Err(e) => {
486 eprintln!("Error rendering tree: {}", e.message);
487 yield ArborEvent::TreeRender { tree_id, render: format!("Error: {}", e.message) };
488 }
489 }
490 }
491 }
492}
493
494async fn resolve_handle_to_string<P: HubContext>(parent: &P, handle: &Handle) -> String {
496 match parent.resolve_handle(handle).await {
497 Ok(mut stream) => {
498 while let Some(item) = stream.next().await {
500 match item {
501 PlexusStreamItem::Data { content, .. } => {
502 return extract_display_content(&content);
504 }
505 PlexusStreamItem::Error { message, .. } => {
506 return format!("[error: {}]", message);
507 }
508 PlexusStreamItem::Done { .. } => break,
509 _ => continue,
510 }
511 }
512 format!("[empty: {}]", handle)
513 }
514 Err(e) => {
515 format!("[unresolved: {} - {}]", handle.method, e)
516 }
517 }
518}
519
520fn extract_display_content(content: &Value) -> String {
522 if let Some(msg_content) = content.get("content").and_then(|v| v.as_str()) {
526 let role = content.get("role").and_then(|v| v.as_str()).unwrap_or("unknown");
527 let name = content.get("name").and_then(|v| v.as_str());
528
529 let truncated = if msg_content.len() > 60 {
530 format!("{}...", &msg_content[..57])
531 } else {
532 msg_content.to_string()
533 };
534
535 return if let Some(n) = name {
536 format!("[{}:{}] {}", role, n, truncated.replace('\n', "↵"))
537 } else {
538 format!("[{}] {}", role, truncated.replace('\n', "↵"))
539 };
540 }
541
542 if let Some(type_str) = content.get("type").and_then(|v| v.as_str()) {
544 return format!("[{}]", type_str);
545 }
546
547 let json_str = content.to_string();
549 if json_str.len() > 50 {
550 format!("{}...", &json_str[..47])
551 } else {
552 json_str
553 }
554}