1use super::methods::ConeIdentifier;
2use super::storage::{ConeStorage, ConeStorageConfig};
3use super::types::{
4 ChatEvent, ChatUsage, CreateResult, DeleteResult, GetResult,
5 ListResult, MessageRole, RegistryResult, ResolveResult, SetHeadResult,
6};
7use crate::activations::arbor::{Node, NodeId, NodeType};
8use crate::activations::bash::Bash;
9use crate::plexus::{HubContext, NoParent};
10use async_stream::stream;
11use cllient::{Message, ModelRegistry};
12use futures::Stream;
13use plexus_macros::hub_methods;
14use std::marker::PhantomData;
15use std::sync::{Arc, OnceLock};
16
17#[derive(Clone)]
24pub struct Cone<P: HubContext = NoParent> {
25 storage: Arc<ConeStorage>,
26 llm_registry: Arc<ModelRegistry>,
27 hub: Arc<OnceLock<P>>,
29 _phantom: PhantomData<P>,
30}
31
32impl<P: HubContext> Cone<P> {
33 pub async fn with_context_type(
35 config: ConeStorageConfig,
36 arbor: Arc<crate::activations::arbor::ArborStorage>,
37 ) -> Result<Self, String> {
38 let storage = ConeStorage::new(config, arbor)
39 .await
40 .map_err(|e| format!("Failed to initialize cone storage: {}", e.message))?;
41
42 let llm_registry = ModelRegistry::new()
43 .map_err(|e| format!("Failed to initialize LLM registry: {}", e))?;
44
45 Ok(Self {
46 storage: Arc::new(storage),
47 llm_registry: Arc::new(llm_registry),
48 hub: Arc::new(OnceLock::new()),
49 _phantom: PhantomData,
50 })
51 }
52
53 pub fn inject_parent(&self, parent: P) {
58 let _ = self.hub.set(parent);
59 }
60
61 pub fn has_parent(&self) -> bool {
63 self.hub.get().is_some()
64 }
65
66 pub fn parent(&self) -> Option<&P> {
70 self.hub.get()
71 }
72
73 pub fn storage(&self) -> &Arc<ConeStorage> {
77 &self.storage
78 }
79}
80
81impl Cone<NoParent> {
83 pub async fn new(
84 config: ConeStorageConfig,
85 arbor: Arc<crate::activations::arbor::ArborStorage>,
86 ) -> Result<Self, String> {
87 Self::with_context_type(config, arbor).await
88 }
89
90 pub async fn register_default_templates(
95 &self,
96 mustache: &crate::activations::mustache::Mustache,
97 ) -> Result<(), String> {
98 let plugin_id = Self::PLUGIN_ID;
99
100 mustache.register_templates(plugin_id, &[
101 ("chat", "default", "[{{role}}] {{#name}}({{name}}) {{/name}}{{content}}"),
103 ("chat", "markdown", "**{{role}}**{{#name}} ({{name}}){{/name}}\n\n{{content}}"),
104 ("chat", "json", r#"{"role":"{{role}}","content":"{{content}}","name":"{{name}}"}"#),
105
106 ("create", "default", "Cone created: {{cone_id}} (head: {{head.tree_id}}/{{head.node_id}})"),
108
109 ("list", "default", "{{#cones}}{{name}} ({{id}}) - {{model_id}}\n{{/cones}}"),
111 ]).await
112 }
113}
114
115impl<P: HubContext> Cone<P> {
116 pub async fn resolve_handle_impl(
121 &self,
122 handle: &crate::types::Handle,
123 ) -> Result<crate::plexus::PlexusStream, crate::plexus::PlexusError> {
124 use crate::plexus::{PlexusError, wrap_stream};
125 use async_stream::stream;
126
127 let storage = self.storage.clone();
128
129 if handle.meta.is_empty() {
132 return Err(PlexusError::ExecutionError(
133 "Cone handle missing message ID in meta".to_string()
134 ));
135 }
136 let identifier = handle.meta.join(":");
137
138 let name = handle.meta.get(2).cloned();
140
141 let result_stream = stream! {
142 match storage.resolve_message_handle(&identifier).await {
143 Ok(message) => {
144 yield ResolveResult::Message {
145 id: message.id.to_string(),
146 role: message.role.as_str().to_string(),
147 content: message.content,
148 model: message.model_id,
149 name: name.unwrap_or_else(|| message.role.as_str().to_string()),
150 };
151 }
152 Err(e) => {
153 yield ResolveResult::Error {
154 message: format!("Failed to resolve handle: {}", e.message),
155 };
156 }
157 }
158 };
159
160 Ok(wrap_stream(result_stream, "cone.resolve_handle", vec!["cone".into()]))
161 }
162}
163
164#[hub_methods(
165 namespace = "cone",
166 version = "1.0.0",
167 description = "LLM cone with persistent conversation context",
168 resolve_handle
169)]
170impl<P: HubContext> Cone<P> {
171 #[plexus_macros::hub_method(
173 params(
174 name = "Human-readable name for the cone",
175 model_id = "LLM model ID (e.g., 'gpt-4o-mini', 'claude-3-haiku-20240307')",
176 system_prompt = "Optional system prompt / instructions",
177 metadata = "Optional configuration metadata"
178 )
179 )]
180 async fn create(
181 &self,
182 name: String,
183 model_id: String,
184 system_prompt: Option<String>,
185 metadata: Option<serde_json::Value>,
186 ) -> impl Stream<Item = CreateResult> + Send + 'static {
187 let storage = self.storage.clone();
188 let llm_registry = self.llm_registry.clone();
189
190 stream! {
191 if let Err(e) = llm_registry.from_id(&model_id) {
193 yield CreateResult::Error {
194 message: format!("Invalid model_id '{}': {}", model_id, e)
195 };
196 return;
197 }
198
199 match storage.cone_create(name, model_id, system_prompt, metadata).await {
200 Ok(cone) => {
201 yield CreateResult::Created {
202 cone_id: cone.id,
203 head: cone.head,
204 };
205 }
206 Err(e) => {
207 yield CreateResult::Error { message: e.message };
208 }
209 }
210 }
211 }
212
213 #[plexus_macros::hub_method(
215 params(identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')")
216 )]
217 async fn get(
218 &self,
219 identifier: ConeIdentifier,
220 ) -> impl Stream<Item = GetResult> + Send + 'static {
221 let storage = self.storage.clone();
222
223 stream! {
224 let cone_id = match storage.resolve_cone_identifier(&identifier).await {
226 Ok(id) => id,
227 Err(e) => {
228 yield GetResult::Error { message: e.message };
229 return;
230 }
231 };
232
233 match storage.cone_get(&cone_id).await {
234 Ok(cone) => {
235 yield GetResult::Data { cone };
236 }
237 Err(e) => {
238 yield GetResult::Error { message: e.message };
239 }
240 }
241 }
242 }
243
244 #[plexus_macros::hub_method]
246 async fn list(&self) -> impl Stream<Item = ListResult> + Send + 'static {
247 let storage = self.storage.clone();
248
249 stream! {
250 match storage.cone_list().await {
251 Ok(cones) => {
252 yield ListResult::List { cones };
253 }
254 Err(e) => {
255 yield ListResult::Error { message: e.message };
256 }
257 }
258 }
259 }
260
261 #[plexus_macros::hub_method(
263 params(identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')")
264 )]
265 async fn delete(
266 &self,
267 identifier: ConeIdentifier,
268 ) -> impl Stream<Item = DeleteResult> + Send + 'static {
269 let storage = self.storage.clone();
270
271 stream! {
272 let cone_id = match storage.resolve_cone_identifier(&identifier).await {
274 Ok(id) => id,
275 Err(e) => {
276 yield DeleteResult::Error { message: e.message };
277 return;
278 }
279 };
280
281 match storage.cone_delete(&cone_id).await {
282 Ok(()) => {
283 yield DeleteResult::Deleted { cone_id };
284 }
285 Err(e) => {
286 yield DeleteResult::Error { message: e.message };
287 }
288 }
289 }
290 }
291
292 #[plexus_macros::hub_method(
294 streaming,
295 params(
296 identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')",
297 prompt = "User message / prompt to send to the LLM",
298 ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
299 )
300 )]
301 async fn chat(
302 &self,
303 identifier: ConeIdentifier,
304 prompt: String,
305 ephemeral: Option<bool>,
306 ) -> impl Stream<Item = ChatEvent> + Send + 'static {
307 let storage = self.storage.clone();
308 let llm_registry = self.llm_registry.clone();
309
310 stream! {
311 let is_ephemeral = ephemeral.unwrap_or(false);
312
313 let cone_id = match storage.resolve_cone_identifier(&identifier).await {
315 Ok(id) => id,
316 Err(e) => {
317 yield ChatEvent::Error { message: e.message };
318 return;
319 }
320 };
321
322 let cone = match storage.cone_get(&cone_id).await {
324 Ok(a) => a,
325 Err(e) => {
326 yield ChatEvent::Error { message: format!("Failed to get cone: {}", e.message) };
327 return;
328 }
329 };
330
331 let context_nodes = match storage.arbor().context_get_path(&cone.head.tree_id, &cone.head.node_id).await {
333 Ok(nodes) => nodes,
334 Err(e) => {
335 yield ChatEvent::Error { message: format!("Failed to get context path: {}", e) };
336 return;
337 }
338 };
339
340 let messages = match resolve_context_to_messages(&storage, &context_nodes, &cone.system_prompt).await {
342 Ok(msgs) => msgs,
343 Err(e) => {
344 yield ChatEvent::Error { message: format!("Failed to resolve context: {}", e) };
345 return;
346 }
347 };
348
349 let user_message = if is_ephemeral {
351 match storage.message_create_ephemeral(
352 &cone_id,
353 MessageRole::User,
354 prompt.clone(),
355 None,
356 None,
357 None,
358 ).await {
359 Ok(msg) => msg,
360 Err(e) => {
361 yield ChatEvent::Error { message: format!("Failed to store user message: {}", e.message) };
362 return;
363 }
364 }
365 } else {
366 match storage.message_create(
367 &cone_id,
368 MessageRole::User,
369 prompt.clone(),
370 None,
371 None,
372 None,
373 ).await {
374 Ok(msg) => msg,
375 Err(e) => {
376 yield ChatEvent::Error { message: format!("Failed to store user message: {}", e.message) };
377 return;
378 }
379 }
380 };
381
382 let user_handle = ConeStorage::message_to_handle(&user_message, "user");
384 let user_node_id = if is_ephemeral {
385 match storage.arbor().node_create_external_ephemeral(
386 &cone.head.tree_id,
387 Some(cone.head.node_id),
388 user_handle,
389 None,
390 ).await {
391 Ok(id) => id,
392 Err(e) => {
393 yield ChatEvent::Error { message: format!("Failed to create user node: {}", e) };
394 return;
395 }
396 }
397 } else {
398 match storage.arbor().node_create_external(
399 &cone.head.tree_id,
400 Some(cone.head.node_id),
401 user_handle,
402 None,
403 ).await {
404 Ok(id) => id,
405 Err(e) => {
406 yield ChatEvent::Error { message: format!("Failed to create user node: {}", e) };
407 return;
408 }
409 }
410 };
411
412 let user_position = cone.head.advance(user_node_id);
413
414 yield ChatEvent::Start {
416 cone_id,
417 user_position,
418 };
419
420 let mut llm_messages = messages;
422 llm_messages.push(Message::user(&prompt));
423
424 let request_builder = match llm_registry.from_id(&cone.model_id) {
425 Ok(rb) => rb,
426 Err(e) => {
427 yield ChatEvent::Error { message: format!("Failed to create request builder: {}", e) };
428 return;
429 }
430 };
431
432 let mut builder = request_builder;
433 if let Some(ref sys) = cone.system_prompt {
434 builder = builder.system(sys);
435 }
436 builder = builder.messages(llm_messages);
437
438 let mut stream_result = match builder.stream().await {
440 Ok(s) => s,
441 Err(e) => {
442 yield ChatEvent::Error { message: format!("Failed to start LLM stream: {}", e) };
443 return;
444 }
445 };
446
447 let mut full_response = String::new();
448 let mut input_tokens: Option<i64> = None;
449 let mut output_tokens: Option<i64> = None;
450
451 use futures::StreamExt;
452 while let Some(event) = stream_result.next().await {
453 match event {
454 Ok(cllient::streaming::StreamEvent::Content(text)) => {
455 full_response.push_str(&text);
456 yield ChatEvent::Content {
457 cone_id,
458 content: text,
459 };
460 }
461 Ok(cllient::streaming::StreamEvent::Usage { input_tokens: inp, output_tokens: out, .. }) => {
462 input_tokens = inp.map(|t| t as i64);
463 output_tokens = out.map(|t| t as i64);
464 }
465 Ok(cllient::streaming::StreamEvent::Error(e)) => {
466 yield ChatEvent::Error { message: format!("LLM error: {}", e) };
467 return;
468 }
469 Ok(_) => {
470 }
472 Err(e) => {
473 yield ChatEvent::Error { message: format!("Stream error: {}", e) };
474 return;
475 }
476 }
477 }
478
479 let assistant_message = if is_ephemeral {
481 match storage.message_create_ephemeral(
482 &cone_id,
483 MessageRole::Assistant,
484 full_response,
485 Some(cone.model_id.clone()),
486 input_tokens,
487 output_tokens,
488 ).await {
489 Ok(msg) => msg,
490 Err(e) => {
491 yield ChatEvent::Error { message: format!("Failed to store assistant message: {}", e.message) };
492 return;
493 }
494 }
495 } else {
496 match storage.message_create(
497 &cone_id,
498 MessageRole::Assistant,
499 full_response,
500 Some(cone.model_id.clone()),
501 input_tokens,
502 output_tokens,
503 ).await {
504 Ok(msg) => msg,
505 Err(e) => {
506 yield ChatEvent::Error { message: format!("Failed to store assistant message: {}", e.message) };
507 return;
508 }
509 }
510 };
511
512 let assistant_handle = ConeStorage::message_to_handle(&assistant_message, &cone.name);
514 let response_node_id = if is_ephemeral {
515 match storage.arbor().node_create_external_ephemeral(
516 &cone.head.tree_id,
517 Some(user_node_id),
518 assistant_handle,
519 None,
520 ).await {
521 Ok(id) => id,
522 Err(e) => {
523 yield ChatEvent::Error { message: format!("Failed to create response node: {}", e) };
524 return;
525 }
526 }
527 } else {
528 match storage.arbor().node_create_external(
529 &cone.head.tree_id,
530 Some(user_node_id),
531 assistant_handle,
532 None,
533 ).await {
534 Ok(id) => id,
535 Err(e) => {
536 yield ChatEvent::Error { message: format!("Failed to create response node: {}", e) };
537 return;
538 }
539 }
540 };
541
542 let new_head = user_position.advance(response_node_id);
543
544 if !is_ephemeral {
546 if let Err(e) = storage.cone_update_head(&cone_id, response_node_id).await {
547 yield ChatEvent::Error { message: format!("Failed to update head: {}", e.message) };
548 return;
549 }
550 }
551
552 let usage_info = if input_tokens.is_some() || output_tokens.is_some() {
553 Some(ChatUsage {
554 input_tokens: input_tokens.map(|t| t as u64),
555 output_tokens: output_tokens.map(|t| t as u64),
556 total_tokens: input_tokens.and_then(|i| output_tokens.map(|o| (i + o) as u64)),
557 })
558 } else {
559 None
560 };
561
562 yield ChatEvent::Complete {
564 cone_id,
565 new_head: if is_ephemeral { cone.head } else { new_head },
566 usage: usage_info,
567 };
568 }
569 }
570
571 #[plexus_macros::hub_method(
573 params(
574 identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')",
575 node_id = "UUID of the target node to set as the new head"
576 )
577 )]
578 async fn set_head(
579 &self,
580 identifier: ConeIdentifier,
581 node_id: NodeId,
582 ) -> impl Stream<Item = SetHeadResult> + Send + 'static {
583 let storage = self.storage.clone();
584
585 stream! {
586 let cone_id = match storage.resolve_cone_identifier(&identifier).await {
588 Ok(id) => id,
589 Err(e) => {
590 yield SetHeadResult::Error { message: e.message };
591 return;
592 }
593 };
594
595 let old_head = match storage.cone_get(&cone_id).await {
597 Ok(cone) => cone.head,
598 Err(e) => {
599 yield SetHeadResult::Error { message: e.message };
600 return;
601 }
602 };
603
604 let new_head = old_head.advance(node_id);
606
607 match storage.cone_update_head(&cone_id, node_id).await {
608 Ok(()) => {
609 yield SetHeadResult::Updated {
610 cone_id,
611 old_head,
612 new_head,
613 };
614 }
615 Err(e) => {
616 yield SetHeadResult::Error { message: e.message };
617 }
618 }
619 }
620 }
621
622 #[plexus_macros::hub_method]
624 async fn registry(&self) -> impl Stream<Item = RegistryResult> + Send + 'static {
625 let llm_registry = self.llm_registry.clone();
626
627 stream! {
628 let export = llm_registry.export();
629 yield RegistryResult::Registry(export);
630 }
631 }
632}
633
634async fn resolve_context_to_messages(
636 storage: &ConeStorage,
637 nodes: &[Node],
638 _system_prompt: &Option<String>,
639) -> Result<Vec<Message>, String> {
640 let mut messages = Vec::new();
641
642 for node in nodes {
643 match &node.data {
644 NodeType::Text { content } => {
645 if !content.is_empty() {
648 messages.push(Message::user(content));
649 }
650 }
651 NodeType::External { handle } => {
652 if handle.plugin_id == Cone::<NoParent>::PLUGIN_ID {
655 let identifier = handle.meta.join(":");
657 let msg = storage
658 .resolve_message_handle(&identifier)
659 .await
660 .map_err(|e| format!("Failed to resolve message handle: {}", e.message))?;
661
662 let cllient_msg = match msg.role {
663 MessageRole::User => Message::user(&msg.content),
664 MessageRole::Assistant => Message::assistant(&msg.content),
665 MessageRole::System => Message::system(&msg.content),
666 };
667 messages.push(cllient_msg);
668 } else if handle.plugin_id == Bash::PLUGIN_ID {
669 let cmd_id = handle.meta.first().map(|s| s.as_str()).unwrap_or("unknown");
671 messages.push(Message::user(&format!(
672 "[Tool output from bash: {}]",
673 cmd_id
674 )));
675 } else {
676 messages.push(Message::user(&format!(
678 "[External reference: {}]",
679 handle
680 )));
681 }
682 }
683 }
684 }
685
686 Ok(messages)
687}