Skip to main content

plexus_substrate/activations/cone/
activation.rs

1use super::methods::ConeIdentifier;
2use super::storage::{ConeStorage, ConeStorageConfig};
3use super::types::{
4    ChatEvent, ChatUsage, CreateResult, DeleteResult, GetResult,
5    ListResult, MessageRole, RegistryResult, ResolveResult, SetHeadResult,
6};
7use crate::activations::arbor::{Node, NodeId, NodeType};
8use crate::activations::bash::Bash;
9use crate::plexus::{HubContext, NoParent};
10use async_stream::stream;
11use cllient::{Message, ModelRegistry};
12use futures::Stream;
13use plexus_macros::activation;
14use std::marker::PhantomData;
15use std::sync::{Arc, OnceLock};
16
17/// Cone activation - orchestrates LLM conversations with Arbor context
18///
19/// Generic over `P: HubContext` to allow different parent contexts:
20/// - `Weak<DynamicHub>` when registered with a DynamicHub
21/// - Custom context types for sub-hubs
22/// - `NoParent` for standalone testing
23#[derive(Clone)]
24pub struct Cone<P: HubContext = NoParent> {
25    storage: Arc<ConeStorage>,
26    llm_registry: Arc<ModelRegistry>,
27    /// Hub reference for resolving foreign handles when walking arbor trees
28    hub: Arc<OnceLock<P>>,
29    _phantom: PhantomData<P>,
30}
31
32impl<P: HubContext> Cone<P> {
33    /// Create a new Cone with a specific parent context type
34    pub async fn with_context_type(
35        config: ConeStorageConfig,
36        arbor: Arc<crate::activations::arbor::ArborStorage>,
37    ) -> Result<Self, String> {
38        let storage = ConeStorage::new(config, arbor)
39            .await
40            .map_err(|e| format!("Failed to initialize cone storage: {}", e.to_string()))?;
41
42        let llm_registry = ModelRegistry::new()
43            .map_err(|e| format!("Failed to initialize LLM registry: {}", e))?;
44
45        Ok(Self {
46            storage: Arc::new(storage),
47            llm_registry: Arc::new(llm_registry),
48            hub: Arc::new(OnceLock::new()),
49            _phantom: PhantomData,
50        })
51    }
52
53    /// Inject parent context for resolving foreign handles
54    ///
55    /// Called during hub construction (e.g., via Arc::new_cyclic for DynamicHub).
56    /// This allows Cone to resolve handles from other activations when walking arbor trees.
57    pub fn inject_parent(&self, parent: P) {
58        if self.hub.set(parent).is_err() {
59            tracing::warn!("Cone: inject_parent called but parent was already set");
60        }
61    }
62
63    /// Check if parent context has been injected
64    pub fn has_parent(&self) -> bool {
65        self.hub.get().is_some()
66    }
67
68    /// Get a reference to the parent context
69    ///
70    /// Returns None if inject_parent hasn't been called yet.
71    pub fn parent(&self) -> Option<&P> {
72        self.hub.get()
73    }
74
75    /// Get access to the underlying storage
76    ///
77    /// Useful for testing and direct storage operations.
78    pub fn storage(&self) -> &Arc<ConeStorage> {
79        &self.storage
80    }
81}
82
83/// Convenience constructor and utilities for Cone with NoParent (standalone/testing)
84impl Cone<NoParent> {
85    pub async fn new(
86        config: ConeStorageConfig,
87        arbor: Arc<crate::activations::arbor::ArborStorage>,
88    ) -> Result<Self, String> {
89        Self::with_context_type(config, arbor).await
90    }
91
92    /// Register default templates with the mustache plugin
93    ///
94    /// Call this during initialization to register Cone's default templates
95    /// for rendering resolved messages and events.
96    pub async fn register_default_templates(
97        &self,
98        mustache: &crate::activations::mustache::Mustache,
99    ) -> Result<(), String> {
100        let plugin_id = Self::PLUGIN_ID;
101
102        mustache.register_templates(plugin_id, &[
103            // Chat method - resolved message template
104            ("chat", "default", "[{{role}}] {{#name}}({{name}}) {{/name}}{{content}}"),
105            ("chat", "markdown", "**{{role}}**{{#name}} ({{name}}){{/name}}\n\n{{content}}"),
106            ("chat", "json", r#"{"role":"{{role}}","content":"{{content}}","name":"{{name}}"}"#),
107
108            // Create method - cone created event
109            ("create", "default", "Cone created: {{cone_id}} (head: {{head.tree_id}}/{{head.node_id}})"),
110
111            // List method - cone list event
112            ("list", "default", "{{#cones}}{{name}} ({{id}}) - {{model_id}}\n{{/cones}}"),
113        ]).await
114    }
115}
116
117impl<P: HubContext> Cone<P> {
118    /// Resolve a cone handle to its message content
119    ///
120    /// Called by the macro-generated resolve_handle method.
121    /// Handle format: cone@1.0.0::chat:msg-{uuid}:{role}:{name}
122    pub async fn resolve_handle_impl(
123        &self,
124        handle: &crate::types::Handle,
125    ) -> Result<crate::plexus::PlexusStream, crate::plexus::PlexusError> {
126        use crate::plexus::{PlexusError, wrap_stream};
127        use async_stream::stream;
128
129        let storage = self.storage.clone();
130
131        // Join meta parts into colon-separated identifier
132        // Format: "msg-{uuid}:{role}:{name}"
133        if handle.meta.is_empty() {
134            return Err(PlexusError::ExecutionError(
135                "Cone handle missing message ID in meta".to_string()
136            ));
137        }
138        let identifier = handle.meta.join(":");
139
140        // Extract name from meta if present (for response)
141        let name = handle.meta.get(2).cloned();
142
143        let result_stream = stream! {
144            match storage.resolve_message_handle(&identifier).await {
145                Ok(message) => {
146                    yield ResolveResult::Message {
147                        id: message.id.to_string(),
148                        role: message.role.as_str().to_string(),
149                        content: message.content,
150                        model: message.model_id,
151                        name: name.unwrap_or_else(|| message.role.as_str().to_string()),
152                    };
153                }
154                Err(e) => {
155                    yield ResolveResult::Error {
156                        message: format!("Failed to resolve handle: {}", e.to_string()),
157                    };
158                }
159            }
160        };
161
162        Ok(wrap_stream(result_stream, "cone.resolve_handle", vec!["cone".into()]))
163    }
164}
165
166#[plexus_macros::activation(namespace = "cone",
167version = "1.0.0",
168description = "LLM cone with persistent conversation context",
169resolve_handle, crate_path = "plexus_core")]
170impl<P: HubContext> Cone<P> {
171    /// Create a new cone (LLM agent with persistent conversation context)
172    #[plexus_macros::method(params(
173        name = "Human-readable name for the cone",
174        model_id = "LLM model ID (e.g., 'gpt-4o-mini', 'claude-3-haiku-20240307')",
175        system_prompt = "Optional system prompt / instructions",
176        metadata = "Optional configuration metadata"
177    ))]
178    async fn create(
179        &self,
180        name: String,
181        model_id: String,
182        system_prompt: Option<String>,
183        metadata: Option<serde_json::Value>,
184    ) -> impl Stream<Item = CreateResult> + Send + 'static {
185        let storage = self.storage.clone();
186        let llm_registry = self.llm_registry.clone();
187
188        stream! {
189            // Validate model exists before creating cone
190            if let Err(e) = llm_registry.from_id(&model_id) {
191                yield CreateResult::Error {
192                    message: format!("Invalid model_id '{}': {}", model_id, e)
193                };
194                return;
195            }
196
197            match storage.cone_create(name, model_id, system_prompt, metadata).await {
198                Ok(cone) => {
199                    yield CreateResult::Created {
200                        cone_id: cone.id,
201                        head: cone.head,
202                    };
203                }
204                Err(e) => {
205                    yield CreateResult::Error { message: e.to_string() };
206                }
207            }
208        }
209    }
210
211    /// Get cone configuration by name or ID
212    #[plexus_macros::method(params(identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')"))]
213    async fn get(
214        &self,
215        identifier: ConeIdentifier,
216    ) -> impl Stream<Item = GetResult> + Send + 'static {
217        let storage = self.storage.clone();
218
219        stream! {
220            // Resolve identifier to ConeId
221            let cone_id = match storage.resolve_cone_identifier(&identifier).await {
222                Ok(id) => id,
223                Err(e) => {
224                    yield GetResult::Error { message: e.to_string() };
225                    return;
226                }
227            };
228
229            match storage.cone_get(&cone_id).await {
230                Ok(cone) => {
231                    yield GetResult::Data { cone };
232                }
233                Err(e) => {
234                    yield GetResult::Error { message: e.to_string() };
235                }
236            }
237        }
238    }
239
240    /// List all cones
241    #[plexus_macros::method]
242    async fn list(&self) -> impl Stream<Item = ListResult> + Send + 'static {
243        let storage = self.storage.clone();
244
245        stream! {
246            match storage.cone_list().await {
247                Ok(cones) => {
248                    yield ListResult::List { cones };
249                }
250                Err(e) => {
251                    yield ListResult::Error { message: e.to_string() };
252                }
253            }
254        }
255    }
256
257    /// Delete a cone (associated tree is preserved)
258    #[plexus_macros::method(params(identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')"))]
259    async fn delete(
260        &self,
261        identifier: ConeIdentifier,
262    ) -> impl Stream<Item = DeleteResult> + Send + 'static {
263        let storage = self.storage.clone();
264
265        stream! {
266            // Resolve identifier to ConeId
267            let cone_id = match storage.resolve_cone_identifier(&identifier).await {
268                Ok(id) => id,
269                Err(e) => {
270                    yield DeleteResult::Error { message: e.to_string() };
271                    return;
272                }
273            };
274
275            match storage.cone_delete(&cone_id).await {
276                Ok(()) => {
277                    yield DeleteResult::Deleted { cone_id };
278                }
279                Err(e) => {
280                    yield DeleteResult::Error { message: e.to_string() };
281                }
282            }
283        }
284    }
285
286    /// Chat with a cone - appends prompt to context, calls LLM, advances head
287    #[plexus_macros::method(streaming,
288    params(
289        identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')",
290        prompt = "User message / prompt to send to the LLM",
291        ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
292    ))]
293    async fn chat(
294        &self,
295        identifier: ConeIdentifier,
296        prompt: String,
297        ephemeral: Option<bool>,
298    ) -> impl Stream<Item = ChatEvent> + Send + 'static {
299        let storage = self.storage.clone();
300        let llm_registry = self.llm_registry.clone();
301
302        stream! {
303            let is_ephemeral = ephemeral.unwrap_or(false);
304
305            // Resolve identifier to ConeId
306            let cone_id = match storage.resolve_cone_identifier(&identifier).await {
307                Ok(id) => id,
308                Err(e) => {
309                    yield ChatEvent::Error { message: e.to_string() };
310                    return;
311                }
312            };
313
314            // 1. Load cone config
315            let cone = match storage.cone_get(&cone_id).await {
316                Ok(a) => a,
317                Err(e) => {
318                    yield ChatEvent::Error { message: format!("Failed to get cone: {}", e.to_string()) };
319                    return;
320                }
321            };
322
323            // 2. Build context from arbor path (handles only)
324            let context_nodes = match storage.arbor().context_get_path(&cone.head.tree_id, &cone.head.node_id).await {
325                Ok(nodes) => nodes,
326                Err(e) => {
327                    yield ChatEvent::Error { message: format!("Failed to get context path: {}", e) };
328                    return;
329                }
330            };
331
332            // Resolve handles to messages
333            let messages = match resolve_context_to_messages(&storage, &context_nodes, &cone.system_prompt).await {
334                Ok(msgs) => msgs,
335                Err(e) => {
336                    yield ChatEvent::Error { message: format!("Failed to resolve context: {}", e) };
337                    return;
338                }
339            };
340
341            // 3. Store user message in cone database (ephemeral if requested)
342            let user_message = if is_ephemeral {
343                match storage.message_create_ephemeral(
344                    &cone_id,
345                    MessageRole::User,
346                    prompt.clone(),
347                    None,
348                    None,
349                    None,
350                ).await {
351                    Ok(msg) => msg,
352                    Err(e) => {
353                        yield ChatEvent::Error { message: format!("Failed to store user message: {}", e.to_string()) };
354                        return;
355                    }
356                }
357            } else {
358                match storage.message_create(
359                    &cone_id,
360                    MessageRole::User,
361                    prompt.clone(),
362                    None,
363                    None,
364                    None,
365                ).await {
366                    Ok(msg) => msg,
367                    Err(e) => {
368                        yield ChatEvent::Error { message: format!("Failed to store user message: {}", e.to_string()) };
369                        return;
370                    }
371                }
372            };
373
374            // Create external node with handle pointing to user message (ephemeral if requested)
375            let user_handle = ConeStorage::message_to_handle(&user_message, "user");
376            let user_node_id = if is_ephemeral {
377                match storage.arbor().node_create_external_ephemeral(
378                    &cone.head.tree_id,
379                    Some(cone.head.node_id),
380                    user_handle,
381                    None,
382                ).await {
383                    Ok(id) => id,
384                    Err(e) => {
385                        yield ChatEvent::Error { message: format!("Failed to create user node: {}", e) };
386                        return;
387                    }
388                }
389            } else {
390                match storage.arbor().node_create_external(
391                    &cone.head.tree_id,
392                    Some(cone.head.node_id),
393                    user_handle,
394                    None,
395                ).await {
396                    Ok(id) => id,
397                    Err(e) => {
398                        yield ChatEvent::Error { message: format!("Failed to create user node: {}", e) };
399                        return;
400                    }
401                }
402            };
403
404            let user_position = cone.head.advance(user_node_id);
405
406            // Signal chat start
407            yield ChatEvent::Start {
408                cone_id,
409                user_position,
410            };
411
412            // 4. Build LLM request with resolved messages + new user prompt
413            let mut llm_messages = messages;
414            llm_messages.push(Message::user(&prompt));
415
416            let request_builder = match llm_registry.from_id(&cone.model_id) {
417                Ok(rb) => rb,
418                Err(e) => {
419                    yield ChatEvent::Error { message: format!("Failed to create request builder: {}", e) };
420                    return;
421                }
422            };
423
424            let mut builder = request_builder;
425            if let Some(ref sys) = cone.system_prompt {
426                builder = builder.system(sys);
427            }
428            builder = builder.messages(llm_messages);
429
430            // Stream the response
431            let mut stream_result = match builder.stream().await {
432                Ok(s) => s,
433                Err(e) => {
434                    yield ChatEvent::Error { message: format!("Failed to start LLM stream: {}", e) };
435                    return;
436                }
437            };
438
439            let mut full_response = String::new();
440            let mut input_tokens: Option<i64> = None;
441            let mut output_tokens: Option<i64> = None;
442
443            use futures::StreamExt;
444            while let Some(event) = stream_result.next().await {
445                match event {
446                    Ok(cllient::streaming::StreamEvent::Content(text)) => {
447                        full_response.push_str(&text);
448                        yield ChatEvent::Content {
449                            cone_id,
450                            content: text,
451                        };
452                    }
453                    Ok(cllient::streaming::StreamEvent::Usage { input_tokens: inp, output_tokens: out, .. }) => {
454                        input_tokens = inp.map(|t| t as i64);
455                        output_tokens = out.map(|t| t as i64);
456                    }
457                    Ok(cllient::streaming::StreamEvent::Error(e)) => {
458                        yield ChatEvent::Error { message: format!("LLM error: {}", e) };
459                        return;
460                    }
461                    Ok(_) => {
462                        // Ignore other events (Start, Finish, Role, Raw)
463                    }
464                    Err(e) => {
465                        yield ChatEvent::Error { message: format!("Stream error: {}", e) };
466                        return;
467                    }
468                }
469            }
470
471            // 5. Store assistant response in cone database (ephemeral if requested)
472            let assistant_message = if is_ephemeral {
473                match storage.message_create_ephemeral(
474                    &cone_id,
475                    MessageRole::Assistant,
476                    full_response,
477                    Some(cone.model_id.clone()),
478                    input_tokens,
479                    output_tokens,
480                ).await {
481                    Ok(msg) => msg,
482                    Err(e) => {
483                        yield ChatEvent::Error { message: format!("Failed to store assistant message: {}", e.to_string()) };
484                        return;
485                    }
486                }
487            } else {
488                match storage.message_create(
489                    &cone_id,
490                    MessageRole::Assistant,
491                    full_response,
492                    Some(cone.model_id.clone()),
493                    input_tokens,
494                    output_tokens,
495                ).await {
496                    Ok(msg) => msg,
497                    Err(e) => {
498                        yield ChatEvent::Error { message: format!("Failed to store assistant message: {}", e.to_string()) };
499                        return;
500                    }
501                }
502            };
503
504            // Create external node with handle pointing to assistant message (ephemeral if requested)
505            let assistant_handle = ConeStorage::message_to_handle(&assistant_message, &cone.name);
506            let response_node_id = if is_ephemeral {
507                match storage.arbor().node_create_external_ephemeral(
508                    &cone.head.tree_id,
509                    Some(user_node_id),
510                    assistant_handle,
511                    None,
512                ).await {
513                    Ok(id) => id,
514                    Err(e) => {
515                        yield ChatEvent::Error { message: format!("Failed to create response node: {}", e) };
516                        return;
517                    }
518                }
519            } else {
520                match storage.arbor().node_create_external(
521                    &cone.head.tree_id,
522                    Some(user_node_id),
523                    assistant_handle,
524                    None,
525                ).await {
526                    Ok(id) => id,
527                    Err(e) => {
528                        yield ChatEvent::Error { message: format!("Failed to create response node: {}", e) };
529                        return;
530                    }
531                }
532            };
533
534            let new_head = user_position.advance(response_node_id);
535
536            // 6. Update canonical_head (skip for ephemeral)
537            if !is_ephemeral {
538                if let Err(e) = storage.cone_update_head(&cone_id, response_node_id).await {
539                    yield ChatEvent::Error { message: format!("Failed to update head: {}", e.to_string()) };
540                    return;
541                }
542            }
543
544            let usage_info = if input_tokens.is_some() || output_tokens.is_some() {
545                Some(ChatUsage {
546                    input_tokens: input_tokens.map(|t| t as u64),
547                    output_tokens: output_tokens.map(|t| t as u64),
548                    total_tokens: input_tokens.and_then(|i| output_tokens.map(|o| (i + o) as u64)),
549                })
550            } else {
551                None
552            };
553
554            // For ephemeral, return original head (not the ephemeral node)
555            yield ChatEvent::Complete {
556                cone_id,
557                new_head: if is_ephemeral { cone.head } else { new_head },
558                usage: usage_info,
559            };
560        }
561    }
562
563    /// Move cone's canonical head to a different node in the tree
564    #[plexus_macros::method(params(
565        identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')",
566        node_id = "UUID of the target node to set as the new head"
567    ))]
568    async fn set_head(
569        &self,
570        identifier: ConeIdentifier,
571        node_id: NodeId,
572    ) -> impl Stream<Item = SetHeadResult> + Send + 'static {
573        let storage = self.storage.clone();
574
575        stream! {
576            // Resolve identifier to ConeId
577            let cone_id = match storage.resolve_cone_identifier(&identifier).await {
578                Ok(id) => id,
579                Err(e) => {
580                    yield SetHeadResult::Error { message: e.to_string() };
581                    return;
582                }
583            };
584
585            // Get current head first
586            let old_head = match storage.cone_get(&cone_id).await {
587                Ok(cone) => cone.head,
588                Err(e) => {
589                    yield SetHeadResult::Error { message: e.to_string() };
590                    return;
591                }
592            };
593
594            // Advance to new node in same tree
595            let new_head = old_head.advance(node_id);
596
597            match storage.cone_update_head(&cone_id, node_id).await {
598                Ok(()) => {
599                    yield SetHeadResult::Updated {
600                        cone_id,
601                        old_head,
602                        new_head,
603                    };
604                }
605                Err(e) => {
606                    yield SetHeadResult::Error { message: e.to_string() };
607                }
608            }
609        }
610    }
611
612    /// Get available LLM services and models
613    #[plexus_macros::method]
614    async fn registry(&self) -> impl Stream<Item = RegistryResult> + Send + 'static {
615        let llm_registry = self.llm_registry.clone();
616
617        stream! {
618            let export = llm_registry.export();
619            yield RegistryResult::Registry(export);
620        }
621    }
622}
623
624/// Resolve arbor context path to cllient messages by resolving handles
625async fn resolve_context_to_messages(
626    storage: &ConeStorage,
627    nodes: &[Node],
628    _system_prompt: &Option<String>,
629) -> Result<Vec<Message>, String> {
630    let mut messages = Vec::new();
631
632    for node in nodes {
633        match &node.data {
634            NodeType::Text { content } => {
635                // Text nodes shouldn't exist in the new design, but handle gracefully
636                // Skip empty root nodes
637                if !content.is_empty() {
638                    messages.push(Message::user(content));
639                }
640            }
641            NodeType::External { handle } => {
642                // Resolve handle based on plugin_id
643                // Use Cone::<NoParent> to access the const (same for all P)
644                if handle.plugin_id == Cone::<NoParent>::PLUGIN_ID {
645                    // Resolve cone message handle - format: "msg-{uuid}:{role}:{name}"
646                    let identifier = handle.meta.join(":");
647                    let msg = storage
648                        .resolve_message_handle(&identifier)
649                        .await
650                        .map_err(|e| format!("Failed to resolve message handle: {}", e.to_string()))?;
651
652                    let cllient_msg = match msg.role {
653                        MessageRole::User => Message::user(&msg.content),
654                        MessageRole::Assistant => Message::assistant(&msg.content),
655                        MessageRole::System => Message::system(&msg.content),
656                    };
657                    messages.push(cllient_msg);
658                } else if handle.plugin_id == Bash::PLUGIN_ID {
659                    // TODO: Resolve bash output when bash plugin integration is added
660                    let cmd_id = handle.meta.first().map(|s| s.as_str()).unwrap_or("unknown");
661                    messages.push(Message::user(&format!(
662                        "[Tool output from bash: {}]",
663                        cmd_id
664                    )));
665                } else {
666                    // Unknown handle plugin - include as reference using Display
667                    messages.push(Message::user(&format!(
668                        "[External reference: {}]",
669                        handle
670                    )));
671                }
672            }
673        }
674    }
675
676    Ok(messages)
677}