1use super::methods::ConeIdentifier;
2use super::storage::{ConeStorage, ConeStorageConfig};
3use super::types::{
4 ChatEvent, ChatUsage, CreateResult, DeleteResult, GetResult,
5 ListResult, MessageRole, RegistryResult, ResolveResult, SetHeadResult,
6};
7use crate::activations::arbor::{Node, NodeId, NodeType};
8use crate::activations::bash::Bash;
9use crate::plexus::{HubContext, NoParent};
10use async_stream::stream;
11use cllient::{Message, ModelRegistry};
12use futures::Stream;
13use plexus_macros::activation;
14use std::marker::PhantomData;
15use std::sync::{Arc, OnceLock};
16
17#[derive(Clone)]
24pub struct Cone<P: HubContext = NoParent> {
25 storage: Arc<ConeStorage>,
26 llm_registry: Arc<ModelRegistry>,
27 hub: Arc<OnceLock<P>>,
29 _phantom: PhantomData<P>,
30}
31
32impl<P: HubContext> Cone<P> {
33 pub async fn with_context_type(
35 config: ConeStorageConfig,
36 arbor: Arc<crate::activations::arbor::ArborStorage>,
37 ) -> Result<Self, String> {
38 let storage = ConeStorage::new(config, arbor)
39 .await
40 .map_err(|e| format!("Failed to initialize cone storage: {}", e.to_string()))?;
41
42 let llm_registry = ModelRegistry::new()
43 .map_err(|e| format!("Failed to initialize LLM registry: {}", e))?;
44
45 Ok(Self {
46 storage: Arc::new(storage),
47 llm_registry: Arc::new(llm_registry),
48 hub: Arc::new(OnceLock::new()),
49 _phantom: PhantomData,
50 })
51 }
52
53 pub fn inject_parent(&self, parent: P) {
58 if self.hub.set(parent).is_err() {
59 tracing::warn!("Cone: inject_parent called but parent was already set");
60 }
61 }
62
63 pub fn has_parent(&self) -> bool {
65 self.hub.get().is_some()
66 }
67
68 pub fn parent(&self) -> Option<&P> {
72 self.hub.get()
73 }
74
75 pub fn storage(&self) -> &Arc<ConeStorage> {
79 &self.storage
80 }
81}
82
83impl Cone<NoParent> {
85 pub async fn new(
86 config: ConeStorageConfig,
87 arbor: Arc<crate::activations::arbor::ArborStorage>,
88 ) -> Result<Self, String> {
89 Self::with_context_type(config, arbor).await
90 }
91
92 pub async fn register_default_templates(
97 &self,
98 mustache: &crate::activations::mustache::Mustache,
99 ) -> Result<(), String> {
100 let plugin_id = Self::PLUGIN_ID;
101
102 mustache.register_templates(plugin_id, &[
103 ("chat", "default", "[{{role}}] {{#name}}({{name}}) {{/name}}{{content}}"),
105 ("chat", "markdown", "**{{role}}**{{#name}} ({{name}}){{/name}}\n\n{{content}}"),
106 ("chat", "json", r#"{"role":"{{role}}","content":"{{content}}","name":"{{name}}"}"#),
107
108 ("create", "default", "Cone created: {{cone_id}} (head: {{head.tree_id}}/{{head.node_id}})"),
110
111 ("list", "default", "{{#cones}}{{name}} ({{id}}) - {{model_id}}\n{{/cones}}"),
113 ]).await
114 }
115}
116
117impl<P: HubContext> Cone<P> {
118 pub async fn resolve_handle_impl(
123 &self,
124 handle: &crate::types::Handle,
125 ) -> Result<crate::plexus::PlexusStream, crate::plexus::PlexusError> {
126 use crate::plexus::{PlexusError, wrap_stream};
127 use async_stream::stream;
128
129 let storage = self.storage.clone();
130
131 if handle.meta.is_empty() {
134 return Err(PlexusError::ExecutionError(
135 "Cone handle missing message ID in meta".to_string()
136 ));
137 }
138 let identifier = handle.meta.join(":");
139
140 let name = handle.meta.get(2).cloned();
142
143 let result_stream = stream! {
144 match storage.resolve_message_handle(&identifier).await {
145 Ok(message) => {
146 yield ResolveResult::Message {
147 id: message.id.to_string(),
148 role: message.role.as_str().to_string(),
149 content: message.content,
150 model: message.model_id,
151 name: name.unwrap_or_else(|| message.role.as_str().to_string()),
152 };
153 }
154 Err(e) => {
155 yield ResolveResult::Error {
156 message: format!("Failed to resolve handle: {}", e.to_string()),
157 };
158 }
159 }
160 };
161
162 Ok(wrap_stream(result_stream, "cone.resolve_handle", vec!["cone".into()]))
163 }
164}
165
166#[plexus_macros::activation(namespace = "cone",
167version = "1.0.0",
168description = "LLM cone with persistent conversation context",
169resolve_handle, crate_path = "plexus_core")]
170impl<P: HubContext> Cone<P> {
171 #[plexus_macros::method(params(
173 name = "Human-readable name for the cone",
174 model_id = "LLM model ID (e.g., 'gpt-4o-mini', 'claude-3-haiku-20240307')",
175 system_prompt = "Optional system prompt / instructions",
176 metadata = "Optional configuration metadata"
177 ))]
178 async fn create(
179 &self,
180 name: String,
181 model_id: String,
182 system_prompt: Option<String>,
183 metadata: Option<serde_json::Value>,
184 ) -> impl Stream<Item = CreateResult> + Send + 'static {
185 let storage = self.storage.clone();
186 let llm_registry = self.llm_registry.clone();
187
188 stream! {
189 if let Err(e) = llm_registry.from_id(&model_id) {
191 yield CreateResult::Error {
192 message: format!("Invalid model_id '{}': {}", model_id, e)
193 };
194 return;
195 }
196
197 match storage.cone_create(name, model_id, system_prompt, metadata).await {
198 Ok(cone) => {
199 yield CreateResult::Created {
200 cone_id: cone.id,
201 head: cone.head,
202 };
203 }
204 Err(e) => {
205 yield CreateResult::Error { message: e.to_string() };
206 }
207 }
208 }
209 }
210
211 #[plexus_macros::method(params(identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')"))]
213 async fn get(
214 &self,
215 identifier: ConeIdentifier,
216 ) -> impl Stream<Item = GetResult> + Send + 'static {
217 let storage = self.storage.clone();
218
219 stream! {
220 let cone_id = match storage.resolve_cone_identifier(&identifier).await {
222 Ok(id) => id,
223 Err(e) => {
224 yield GetResult::Error { message: e.to_string() };
225 return;
226 }
227 };
228
229 match storage.cone_get(&cone_id).await {
230 Ok(cone) => {
231 yield GetResult::Data { cone };
232 }
233 Err(e) => {
234 yield GetResult::Error { message: e.to_string() };
235 }
236 }
237 }
238 }
239
240 #[plexus_macros::method]
242 async fn list(&self) -> impl Stream<Item = ListResult> + Send + 'static {
243 let storage = self.storage.clone();
244
245 stream! {
246 match storage.cone_list().await {
247 Ok(cones) => {
248 yield ListResult::List { cones };
249 }
250 Err(e) => {
251 yield ListResult::Error { message: e.to_string() };
252 }
253 }
254 }
255 }
256
257 #[plexus_macros::method(params(identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')"))]
259 async fn delete(
260 &self,
261 identifier: ConeIdentifier,
262 ) -> impl Stream<Item = DeleteResult> + Send + 'static {
263 let storage = self.storage.clone();
264
265 stream! {
266 let cone_id = match storage.resolve_cone_identifier(&identifier).await {
268 Ok(id) => id,
269 Err(e) => {
270 yield DeleteResult::Error { message: e.to_string() };
271 return;
272 }
273 };
274
275 match storage.cone_delete(&cone_id).await {
276 Ok(()) => {
277 yield DeleteResult::Deleted { cone_id };
278 }
279 Err(e) => {
280 yield DeleteResult::Error { message: e.to_string() };
281 }
282 }
283 }
284 }
285
286 #[plexus_macros::method(streaming,
288 params(
289 identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')",
290 prompt = "User message / prompt to send to the LLM",
291 ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
292 ))]
293 async fn chat(
294 &self,
295 identifier: ConeIdentifier,
296 prompt: String,
297 ephemeral: Option<bool>,
298 ) -> impl Stream<Item = ChatEvent> + Send + 'static {
299 let storage = self.storage.clone();
300 let llm_registry = self.llm_registry.clone();
301
302 stream! {
303 let is_ephemeral = ephemeral.unwrap_or(false);
304
305 let cone_id = match storage.resolve_cone_identifier(&identifier).await {
307 Ok(id) => id,
308 Err(e) => {
309 yield ChatEvent::Error { message: e.to_string() };
310 return;
311 }
312 };
313
314 let cone = match storage.cone_get(&cone_id).await {
316 Ok(a) => a,
317 Err(e) => {
318 yield ChatEvent::Error { message: format!("Failed to get cone: {}", e.to_string()) };
319 return;
320 }
321 };
322
323 let context_nodes = match storage.arbor().context_get_path(&cone.head.tree_id, &cone.head.node_id).await {
325 Ok(nodes) => nodes,
326 Err(e) => {
327 yield ChatEvent::Error { message: format!("Failed to get context path: {}", e) };
328 return;
329 }
330 };
331
332 let messages = match resolve_context_to_messages(&storage, &context_nodes, &cone.system_prompt).await {
334 Ok(msgs) => msgs,
335 Err(e) => {
336 yield ChatEvent::Error { message: format!("Failed to resolve context: {}", e) };
337 return;
338 }
339 };
340
341 let user_message = if is_ephemeral {
343 match storage.message_create_ephemeral(
344 &cone_id,
345 MessageRole::User,
346 prompt.clone(),
347 None,
348 None,
349 None,
350 ).await {
351 Ok(msg) => msg,
352 Err(e) => {
353 yield ChatEvent::Error { message: format!("Failed to store user message: {}", e.to_string()) };
354 return;
355 }
356 }
357 } else {
358 match storage.message_create(
359 &cone_id,
360 MessageRole::User,
361 prompt.clone(),
362 None,
363 None,
364 None,
365 ).await {
366 Ok(msg) => msg,
367 Err(e) => {
368 yield ChatEvent::Error { message: format!("Failed to store user message: {}", e.to_string()) };
369 return;
370 }
371 }
372 };
373
374 let user_handle = ConeStorage::message_to_handle(&user_message, "user");
376 let user_node_id = if is_ephemeral {
377 match storage.arbor().node_create_external_ephemeral(
378 &cone.head.tree_id,
379 Some(cone.head.node_id),
380 user_handle,
381 None,
382 ).await {
383 Ok(id) => id,
384 Err(e) => {
385 yield ChatEvent::Error { message: format!("Failed to create user node: {}", e) };
386 return;
387 }
388 }
389 } else {
390 match storage.arbor().node_create_external(
391 &cone.head.tree_id,
392 Some(cone.head.node_id),
393 user_handle,
394 None,
395 ).await {
396 Ok(id) => id,
397 Err(e) => {
398 yield ChatEvent::Error { message: format!("Failed to create user node: {}", e) };
399 return;
400 }
401 }
402 };
403
404 let user_position = cone.head.advance(user_node_id);
405
406 yield ChatEvent::Start {
408 cone_id,
409 user_position,
410 };
411
412 let mut llm_messages = messages;
414 llm_messages.push(Message::user(&prompt));
415
416 let request_builder = match llm_registry.from_id(&cone.model_id) {
417 Ok(rb) => rb,
418 Err(e) => {
419 yield ChatEvent::Error { message: format!("Failed to create request builder: {}", e) };
420 return;
421 }
422 };
423
424 let mut builder = request_builder;
425 if let Some(ref sys) = cone.system_prompt {
426 builder = builder.system(sys);
427 }
428 builder = builder.messages(llm_messages);
429
430 let mut stream_result = match builder.stream().await {
432 Ok(s) => s,
433 Err(e) => {
434 yield ChatEvent::Error { message: format!("Failed to start LLM stream: {}", e) };
435 return;
436 }
437 };
438
439 let mut full_response = String::new();
440 let mut input_tokens: Option<i64> = None;
441 let mut output_tokens: Option<i64> = None;
442
443 use futures::StreamExt;
444 while let Some(event) = stream_result.next().await {
445 match event {
446 Ok(cllient::streaming::StreamEvent::Content(text)) => {
447 full_response.push_str(&text);
448 yield ChatEvent::Content {
449 cone_id,
450 content: text,
451 };
452 }
453 Ok(cllient::streaming::StreamEvent::Usage { input_tokens: inp, output_tokens: out, .. }) => {
454 input_tokens = inp.map(|t| t as i64);
455 output_tokens = out.map(|t| t as i64);
456 }
457 Ok(cllient::streaming::StreamEvent::Error(e)) => {
458 yield ChatEvent::Error { message: format!("LLM error: {}", e) };
459 return;
460 }
461 Ok(_) => {
462 }
464 Err(e) => {
465 yield ChatEvent::Error { message: format!("Stream error: {}", e) };
466 return;
467 }
468 }
469 }
470
471 let assistant_message = if is_ephemeral {
473 match storage.message_create_ephemeral(
474 &cone_id,
475 MessageRole::Assistant,
476 full_response,
477 Some(cone.model_id.clone()),
478 input_tokens,
479 output_tokens,
480 ).await {
481 Ok(msg) => msg,
482 Err(e) => {
483 yield ChatEvent::Error { message: format!("Failed to store assistant message: {}", e.to_string()) };
484 return;
485 }
486 }
487 } else {
488 match storage.message_create(
489 &cone_id,
490 MessageRole::Assistant,
491 full_response,
492 Some(cone.model_id.clone()),
493 input_tokens,
494 output_tokens,
495 ).await {
496 Ok(msg) => msg,
497 Err(e) => {
498 yield ChatEvent::Error { message: format!("Failed to store assistant message: {}", e.to_string()) };
499 return;
500 }
501 }
502 };
503
504 let assistant_handle = ConeStorage::message_to_handle(&assistant_message, &cone.name);
506 let response_node_id = if is_ephemeral {
507 match storage.arbor().node_create_external_ephemeral(
508 &cone.head.tree_id,
509 Some(user_node_id),
510 assistant_handle,
511 None,
512 ).await {
513 Ok(id) => id,
514 Err(e) => {
515 yield ChatEvent::Error { message: format!("Failed to create response node: {}", e) };
516 return;
517 }
518 }
519 } else {
520 match storage.arbor().node_create_external(
521 &cone.head.tree_id,
522 Some(user_node_id),
523 assistant_handle,
524 None,
525 ).await {
526 Ok(id) => id,
527 Err(e) => {
528 yield ChatEvent::Error { message: format!("Failed to create response node: {}", e) };
529 return;
530 }
531 }
532 };
533
534 let new_head = user_position.advance(response_node_id);
535
536 if !is_ephemeral {
538 if let Err(e) = storage.cone_update_head(&cone_id, response_node_id).await {
539 yield ChatEvent::Error { message: format!("Failed to update head: {}", e.to_string()) };
540 return;
541 }
542 }
543
544 let usage_info = if input_tokens.is_some() || output_tokens.is_some() {
545 Some(ChatUsage {
546 input_tokens: input_tokens.map(|t| t as u64),
547 output_tokens: output_tokens.map(|t| t as u64),
548 total_tokens: input_tokens.and_then(|i| output_tokens.map(|o| (i + o) as u64)),
549 })
550 } else {
551 None
552 };
553
554 yield ChatEvent::Complete {
556 cone_id,
557 new_head: if is_ephemeral { cone.head } else { new_head },
558 usage: usage_info,
559 };
560 }
561 }
562
563 #[plexus_macros::method(params(
565 identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')",
566 node_id = "UUID of the target node to set as the new head"
567 ))]
568 async fn set_head(
569 &self,
570 identifier: ConeIdentifier,
571 node_id: NodeId,
572 ) -> impl Stream<Item = SetHeadResult> + Send + 'static {
573 let storage = self.storage.clone();
574
575 stream! {
576 let cone_id = match storage.resolve_cone_identifier(&identifier).await {
578 Ok(id) => id,
579 Err(e) => {
580 yield SetHeadResult::Error { message: e.to_string() };
581 return;
582 }
583 };
584
585 let old_head = match storage.cone_get(&cone_id).await {
587 Ok(cone) => cone.head,
588 Err(e) => {
589 yield SetHeadResult::Error { message: e.to_string() };
590 return;
591 }
592 };
593
594 let new_head = old_head.advance(node_id);
596
597 match storage.cone_update_head(&cone_id, node_id).await {
598 Ok(()) => {
599 yield SetHeadResult::Updated {
600 cone_id,
601 old_head,
602 new_head,
603 };
604 }
605 Err(e) => {
606 yield SetHeadResult::Error { message: e.to_string() };
607 }
608 }
609 }
610 }
611
612 #[plexus_macros::method]
614 async fn registry(&self) -> impl Stream<Item = RegistryResult> + Send + 'static {
615 let llm_registry = self.llm_registry.clone();
616
617 stream! {
618 let export = llm_registry.export();
619 yield RegistryResult::Registry(export);
620 }
621 }
622}
623
624async fn resolve_context_to_messages(
626 storage: &ConeStorage,
627 nodes: &[Node],
628 _system_prompt: &Option<String>,
629) -> Result<Vec<Message>, String> {
630 let mut messages = Vec::new();
631
632 for node in nodes {
633 match &node.data {
634 NodeType::Text { content } => {
635 if !content.is_empty() {
638 messages.push(Message::user(content));
639 }
640 }
641 NodeType::External { handle } => {
642 if handle.plugin_id == Cone::<NoParent>::PLUGIN_ID {
645 let identifier = handle.meta.join(":");
647 let msg = storage
648 .resolve_message_handle(&identifier)
649 .await
650 .map_err(|e| format!("Failed to resolve message handle: {}", e.to_string()))?;
651
652 let cllient_msg = match msg.role {
653 MessageRole::User => Message::user(&msg.content),
654 MessageRole::Assistant => Message::assistant(&msg.content),
655 MessageRole::System => Message::system(&msg.content),
656 };
657 messages.push(cllient_msg);
658 } else if handle.plugin_id == Bash::PLUGIN_ID {
659 let cmd_id = handle.meta.first().map(|s| s.as_str()).unwrap_or("unknown");
661 messages.push(Message::user(&format!(
662 "[Tool output from bash: {}]",
663 cmd_id
664 )));
665 } else {
666 messages.push(Message::user(&format!(
668 "[External reference: {}]",
669 handle
670 )));
671 }
672 }
673 }
674 }
675
676 Ok(messages)
677}