Skip to main content

plexus_substrate/activations/cone/
activation.rs

1use super::methods::ConeIdentifier;
2use super::storage::{ConeStorage, ConeStorageConfig};
3use super::types::{
4    ChatEvent, ChatUsage, CreateResult, DeleteResult, GetResult,
5    ListResult, MessageRole, RegistryResult, ResolveResult, SetHeadResult,
6};
7use crate::activations::arbor::{Node, NodeId, NodeType};
8use crate::activations::bash::Bash;
9use crate::plexus::{HubContext, NoParent};
10use async_stream::stream;
11use cllient::{Message, ModelRegistry};
12use futures::Stream;
13use plexus_macros::hub_methods;
14use std::marker::PhantomData;
15use std::sync::{Arc, OnceLock};
16
17/// Cone activation - orchestrates LLM conversations with Arbor context
18///
19/// Generic over `P: HubContext` to allow different parent contexts:
20/// - `Weak<DynamicHub>` when registered with a DynamicHub
21/// - Custom context types for sub-hubs
22/// - `NoParent` for standalone testing
23#[derive(Clone)]
24pub struct Cone<P: HubContext = NoParent> {
25    storage: Arc<ConeStorage>,
26    llm_registry: Arc<ModelRegistry>,
27    /// Hub reference for resolving foreign handles when walking arbor trees
28    hub: Arc<OnceLock<P>>,
29    _phantom: PhantomData<P>,
30}
31
32impl<P: HubContext> Cone<P> {
33    /// Create a new Cone with a specific parent context type
34    pub async fn with_context_type(
35        config: ConeStorageConfig,
36        arbor: Arc<crate::activations::arbor::ArborStorage>,
37    ) -> Result<Self, String> {
38        let storage = ConeStorage::new(config, arbor)
39            .await
40            .map_err(|e| format!("Failed to initialize cone storage: {}", e.message))?;
41
42        let llm_registry = ModelRegistry::new()
43            .map_err(|e| format!("Failed to initialize LLM registry: {}", e))?;
44
45        Ok(Self {
46            storage: Arc::new(storage),
47            llm_registry: Arc::new(llm_registry),
48            hub: Arc::new(OnceLock::new()),
49            _phantom: PhantomData,
50        })
51    }
52
53    /// Inject parent context for resolving foreign handles
54    ///
55    /// Called during hub construction (e.g., via Arc::new_cyclic for DynamicHub).
56    /// This allows Cone to resolve handles from other activations when walking arbor trees.
57    pub fn inject_parent(&self, parent: P) {
58        let _ = self.hub.set(parent);
59    }
60
61    /// Check if parent context has been injected
62    pub fn has_parent(&self) -> bool {
63        self.hub.get().is_some()
64    }
65
66    /// Get a reference to the parent context
67    ///
68    /// Returns None if inject_parent hasn't been called yet.
69    pub fn parent(&self) -> Option<&P> {
70        self.hub.get()
71    }
72
73    /// Get access to the underlying storage
74    ///
75    /// Useful for testing and direct storage operations.
76    pub fn storage(&self) -> &Arc<ConeStorage> {
77        &self.storage
78    }
79}
80
81/// Convenience constructor and utilities for Cone with NoParent (standalone/testing)
82impl Cone<NoParent> {
83    pub async fn new(
84        config: ConeStorageConfig,
85        arbor: Arc<crate::activations::arbor::ArborStorage>,
86    ) -> Result<Self, String> {
87        Self::with_context_type(config, arbor).await
88    }
89
90    /// Register default templates with the mustache plugin
91    ///
92    /// Call this during initialization to register Cone's default templates
93    /// for rendering resolved messages and events.
94    pub async fn register_default_templates(
95        &self,
96        mustache: &crate::activations::mustache::Mustache,
97    ) -> Result<(), String> {
98        let plugin_id = Self::PLUGIN_ID;
99
100        mustache.register_templates(plugin_id, &[
101            // Chat method - resolved message template
102            ("chat", "default", "[{{role}}] {{#name}}({{name}}) {{/name}}{{content}}"),
103            ("chat", "markdown", "**{{role}}**{{#name}} ({{name}}){{/name}}\n\n{{content}}"),
104            ("chat", "json", r#"{"role":"{{role}}","content":"{{content}}","name":"{{name}}"}"#),
105
106            // Create method - cone created event
107            ("create", "default", "Cone created: {{cone_id}} (head: {{head.tree_id}}/{{head.node_id}})"),
108
109            // List method - cone list event
110            ("list", "default", "{{#cones}}{{name}} ({{id}}) - {{model_id}}\n{{/cones}}"),
111        ]).await
112    }
113}
114
115impl<P: HubContext> Cone<P> {
116    /// Resolve a cone handle to its message content
117    ///
118    /// Called by the macro-generated resolve_handle method.
119    /// Handle format: cone@1.0.0::chat:msg-{uuid}:{role}:{name}
120    pub async fn resolve_handle_impl(
121        &self,
122        handle: &crate::types::Handle,
123    ) -> Result<crate::plexus::PlexusStream, crate::plexus::PlexusError> {
124        use crate::plexus::{PlexusError, wrap_stream};
125        use async_stream::stream;
126
127        let storage = self.storage.clone();
128
129        // Join meta parts into colon-separated identifier
130        // Format: "msg-{uuid}:{role}:{name}"
131        if handle.meta.is_empty() {
132            return Err(PlexusError::ExecutionError(
133                "Cone handle missing message ID in meta".to_string()
134            ));
135        }
136        let identifier = handle.meta.join(":");
137
138        // Extract name from meta if present (for response)
139        let name = handle.meta.get(2).cloned();
140
141        let result_stream = stream! {
142            match storage.resolve_message_handle(&identifier).await {
143                Ok(message) => {
144                    yield ResolveResult::Message {
145                        id: message.id.to_string(),
146                        role: message.role.as_str().to_string(),
147                        content: message.content,
148                        model: message.model_id,
149                        name: name.unwrap_or_else(|| message.role.as_str().to_string()),
150                    };
151                }
152                Err(e) => {
153                    yield ResolveResult::Error {
154                        message: format!("Failed to resolve handle: {}", e.message),
155                    };
156                }
157            }
158        };
159
160        Ok(wrap_stream(result_stream, "cone.resolve_handle", vec!["cone".into()]))
161    }
162}
163
164#[hub_methods(
165    namespace = "cone",
166    version = "1.0.0",
167    description = "LLM cone with persistent conversation context",
168    resolve_handle
169)]
170impl<P: HubContext> Cone<P> {
171    /// Create a new cone (LLM agent with persistent conversation context)
172    #[plexus_macros::hub_method(
173        params(
174            name = "Human-readable name for the cone",
175            model_id = "LLM model ID (e.g., 'gpt-4o-mini', 'claude-3-haiku-20240307')",
176            system_prompt = "Optional system prompt / instructions",
177            metadata = "Optional configuration metadata"
178        )
179    )]
180    async fn create(
181        &self,
182        name: String,
183        model_id: String,
184        system_prompt: Option<String>,
185        metadata: Option<serde_json::Value>,
186    ) -> impl Stream<Item = CreateResult> + Send + 'static {
187        let storage = self.storage.clone();
188        let llm_registry = self.llm_registry.clone();
189
190        stream! {
191            // Validate model exists before creating cone
192            if let Err(e) = llm_registry.from_id(&model_id) {
193                yield CreateResult::Error {
194                    message: format!("Invalid model_id '{}': {}", model_id, e)
195                };
196                return;
197            }
198
199            match storage.cone_create(name, model_id, system_prompt, metadata).await {
200                Ok(cone) => {
201                    yield CreateResult::Created {
202                        cone_id: cone.id,
203                        head: cone.head,
204                    };
205                }
206                Err(e) => {
207                    yield CreateResult::Error { message: e.message };
208                }
209            }
210        }
211    }
212
213    /// Get cone configuration by name or ID
214    #[plexus_macros::hub_method(
215        params(identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')")
216    )]
217    async fn get(
218        &self,
219        identifier: ConeIdentifier,
220    ) -> impl Stream<Item = GetResult> + Send + 'static {
221        let storage = self.storage.clone();
222
223        stream! {
224            // Resolve identifier to ConeId
225            let cone_id = match storage.resolve_cone_identifier(&identifier).await {
226                Ok(id) => id,
227                Err(e) => {
228                    yield GetResult::Error { message: e.message };
229                    return;
230                }
231            };
232
233            match storage.cone_get(&cone_id).await {
234                Ok(cone) => {
235                    yield GetResult::Data { cone };
236                }
237                Err(e) => {
238                    yield GetResult::Error { message: e.message };
239                }
240            }
241        }
242    }
243
244    /// List all cones
245    #[plexus_macros::hub_method]
246    async fn list(&self) -> impl Stream<Item = ListResult> + Send + 'static {
247        let storage = self.storage.clone();
248
249        stream! {
250            match storage.cone_list().await {
251                Ok(cones) => {
252                    yield ListResult::List { cones };
253                }
254                Err(e) => {
255                    yield ListResult::Error { message: e.message };
256                }
257            }
258        }
259    }
260
261    /// Delete a cone (associated tree is preserved)
262    #[plexus_macros::hub_method(
263        params(identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')")
264    )]
265    async fn delete(
266        &self,
267        identifier: ConeIdentifier,
268    ) -> impl Stream<Item = DeleteResult> + Send + 'static {
269        let storage = self.storage.clone();
270
271        stream! {
272            // Resolve identifier to ConeId
273            let cone_id = match storage.resolve_cone_identifier(&identifier).await {
274                Ok(id) => id,
275                Err(e) => {
276                    yield DeleteResult::Error { message: e.message };
277                    return;
278                }
279            };
280
281            match storage.cone_delete(&cone_id).await {
282                Ok(()) => {
283                    yield DeleteResult::Deleted { cone_id };
284                }
285                Err(e) => {
286                    yield DeleteResult::Error { message: e.message };
287                }
288            }
289        }
290    }
291
292    /// Chat with a cone - appends prompt to context, calls LLM, advances head
293    #[plexus_macros::hub_method(
294        streaming,
295        params(
296            identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')",
297            prompt = "User message / prompt to send to the LLM",
298            ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
299        )
300    )]
301    async fn chat(
302        &self,
303        identifier: ConeIdentifier,
304        prompt: String,
305        ephemeral: Option<bool>,
306    ) -> impl Stream<Item = ChatEvent> + Send + 'static {
307        let storage = self.storage.clone();
308        let llm_registry = self.llm_registry.clone();
309
310        stream! {
311            let is_ephemeral = ephemeral.unwrap_or(false);
312
313            // Resolve identifier to ConeId
314            let cone_id = match storage.resolve_cone_identifier(&identifier).await {
315                Ok(id) => id,
316                Err(e) => {
317                    yield ChatEvent::Error { message: e.message };
318                    return;
319                }
320            };
321
322            // 1. Load cone config
323            let cone = match storage.cone_get(&cone_id).await {
324                Ok(a) => a,
325                Err(e) => {
326                    yield ChatEvent::Error { message: format!("Failed to get cone: {}", e.message) };
327                    return;
328                }
329            };
330
331            // 2. Build context from arbor path (handles only)
332            let context_nodes = match storage.arbor().context_get_path(&cone.head.tree_id, &cone.head.node_id).await {
333                Ok(nodes) => nodes,
334                Err(e) => {
335                    yield ChatEvent::Error { message: format!("Failed to get context path: {}", e) };
336                    return;
337                }
338            };
339
340            // Resolve handles to messages
341            let messages = match resolve_context_to_messages(&storage, &context_nodes, &cone.system_prompt).await {
342                Ok(msgs) => msgs,
343                Err(e) => {
344                    yield ChatEvent::Error { message: format!("Failed to resolve context: {}", e) };
345                    return;
346                }
347            };
348
349            // 3. Store user message in cone database (ephemeral if requested)
350            let user_message = if is_ephemeral {
351                match storage.message_create_ephemeral(
352                    &cone_id,
353                    MessageRole::User,
354                    prompt.clone(),
355                    None,
356                    None,
357                    None,
358                ).await {
359                    Ok(msg) => msg,
360                    Err(e) => {
361                        yield ChatEvent::Error { message: format!("Failed to store user message: {}", e.message) };
362                        return;
363                    }
364                }
365            } else {
366                match storage.message_create(
367                    &cone_id,
368                    MessageRole::User,
369                    prompt.clone(),
370                    None,
371                    None,
372                    None,
373                ).await {
374                    Ok(msg) => msg,
375                    Err(e) => {
376                        yield ChatEvent::Error { message: format!("Failed to store user message: {}", e.message) };
377                        return;
378                    }
379                }
380            };
381
382            // Create external node with handle pointing to user message (ephemeral if requested)
383            let user_handle = ConeStorage::message_to_handle(&user_message, "user");
384            let user_node_id = if is_ephemeral {
385                match storage.arbor().node_create_external_ephemeral(
386                    &cone.head.tree_id,
387                    Some(cone.head.node_id),
388                    user_handle,
389                    None,
390                ).await {
391                    Ok(id) => id,
392                    Err(e) => {
393                        yield ChatEvent::Error { message: format!("Failed to create user node: {}", e) };
394                        return;
395                    }
396                }
397            } else {
398                match storage.arbor().node_create_external(
399                    &cone.head.tree_id,
400                    Some(cone.head.node_id),
401                    user_handle,
402                    None,
403                ).await {
404                    Ok(id) => id,
405                    Err(e) => {
406                        yield ChatEvent::Error { message: format!("Failed to create user node: {}", e) };
407                        return;
408                    }
409                }
410            };
411
412            let user_position = cone.head.advance(user_node_id);
413
414            // Signal chat start
415            yield ChatEvent::Start {
416                cone_id,
417                user_position,
418            };
419
420            // 4. Build LLM request with resolved messages + new user prompt
421            let mut llm_messages = messages;
422            llm_messages.push(Message::user(&prompt));
423
424            let request_builder = match llm_registry.from_id(&cone.model_id) {
425                Ok(rb) => rb,
426                Err(e) => {
427                    yield ChatEvent::Error { message: format!("Failed to create request builder: {}", e) };
428                    return;
429                }
430            };
431
432            let mut builder = request_builder;
433            if let Some(ref sys) = cone.system_prompt {
434                builder = builder.system(sys);
435            }
436            builder = builder.messages(llm_messages);
437
438            // Stream the response
439            let mut stream_result = match builder.stream().await {
440                Ok(s) => s,
441                Err(e) => {
442                    yield ChatEvent::Error { message: format!("Failed to start LLM stream: {}", e) };
443                    return;
444                }
445            };
446
447            let mut full_response = String::new();
448            let mut input_tokens: Option<i64> = None;
449            let mut output_tokens: Option<i64> = None;
450
451            use futures::StreamExt;
452            while let Some(event) = stream_result.next().await {
453                match event {
454                    Ok(cllient::streaming::StreamEvent::Content(text)) => {
455                        full_response.push_str(&text);
456                        yield ChatEvent::Content {
457                            cone_id,
458                            content: text,
459                        };
460                    }
461                    Ok(cllient::streaming::StreamEvent::Usage { input_tokens: inp, output_tokens: out, .. }) => {
462                        input_tokens = inp.map(|t| t as i64);
463                        output_tokens = out.map(|t| t as i64);
464                    }
465                    Ok(cllient::streaming::StreamEvent::Error(e)) => {
466                        yield ChatEvent::Error { message: format!("LLM error: {}", e) };
467                        return;
468                    }
469                    Ok(_) => {
470                        // Ignore other events (Start, Finish, Role, Raw)
471                    }
472                    Err(e) => {
473                        yield ChatEvent::Error { message: format!("Stream error: {}", e) };
474                        return;
475                    }
476                }
477            }
478
479            // 5. Store assistant response in cone database (ephemeral if requested)
480            let assistant_message = if is_ephemeral {
481                match storage.message_create_ephemeral(
482                    &cone_id,
483                    MessageRole::Assistant,
484                    full_response,
485                    Some(cone.model_id.clone()),
486                    input_tokens,
487                    output_tokens,
488                ).await {
489                    Ok(msg) => msg,
490                    Err(e) => {
491                        yield ChatEvent::Error { message: format!("Failed to store assistant message: {}", e.message) };
492                        return;
493                    }
494                }
495            } else {
496                match storage.message_create(
497                    &cone_id,
498                    MessageRole::Assistant,
499                    full_response,
500                    Some(cone.model_id.clone()),
501                    input_tokens,
502                    output_tokens,
503                ).await {
504                    Ok(msg) => msg,
505                    Err(e) => {
506                        yield ChatEvent::Error { message: format!("Failed to store assistant message: {}", e.message) };
507                        return;
508                    }
509                }
510            };
511
512            // Create external node with handle pointing to assistant message (ephemeral if requested)
513            let assistant_handle = ConeStorage::message_to_handle(&assistant_message, &cone.name);
514            let response_node_id = if is_ephemeral {
515                match storage.arbor().node_create_external_ephemeral(
516                    &cone.head.tree_id,
517                    Some(user_node_id),
518                    assistant_handle,
519                    None,
520                ).await {
521                    Ok(id) => id,
522                    Err(e) => {
523                        yield ChatEvent::Error { message: format!("Failed to create response node: {}", e) };
524                        return;
525                    }
526                }
527            } else {
528                match storage.arbor().node_create_external(
529                    &cone.head.tree_id,
530                    Some(user_node_id),
531                    assistant_handle,
532                    None,
533                ).await {
534                    Ok(id) => id,
535                    Err(e) => {
536                        yield ChatEvent::Error { message: format!("Failed to create response node: {}", e) };
537                        return;
538                    }
539                }
540            };
541
542            let new_head = user_position.advance(response_node_id);
543
544            // 6. Update canonical_head (skip for ephemeral)
545            if !is_ephemeral {
546                if let Err(e) = storage.cone_update_head(&cone_id, response_node_id).await {
547                    yield ChatEvent::Error { message: format!("Failed to update head: {}", e.message) };
548                    return;
549                }
550            }
551
552            let usage_info = if input_tokens.is_some() || output_tokens.is_some() {
553                Some(ChatUsage {
554                    input_tokens: input_tokens.map(|t| t as u64),
555                    output_tokens: output_tokens.map(|t| t as u64),
556                    total_tokens: input_tokens.and_then(|i| output_tokens.map(|o| (i + o) as u64)),
557                })
558            } else {
559                None
560            };
561
562            // For ephemeral, return original head (not the ephemeral node)
563            yield ChatEvent::Complete {
564                cone_id,
565                new_head: if is_ephemeral { cone.head } else { new_head },
566                usage: usage_info,
567            };
568        }
569    }
570
571    /// Move cone's canonical head to a different node in the tree
572    #[plexus_macros::hub_method(
573        params(
574            identifier = "Cone name or UUID (e.g., 'my-assistant' or '550e8400-e29b-...')",
575            node_id = "UUID of the target node to set as the new head"
576        )
577    )]
578    async fn set_head(
579        &self,
580        identifier: ConeIdentifier,
581        node_id: NodeId,
582    ) -> impl Stream<Item = SetHeadResult> + Send + 'static {
583        let storage = self.storage.clone();
584
585        stream! {
586            // Resolve identifier to ConeId
587            let cone_id = match storage.resolve_cone_identifier(&identifier).await {
588                Ok(id) => id,
589                Err(e) => {
590                    yield SetHeadResult::Error { message: e.message };
591                    return;
592                }
593            };
594
595            // Get current head first
596            let old_head = match storage.cone_get(&cone_id).await {
597                Ok(cone) => cone.head,
598                Err(e) => {
599                    yield SetHeadResult::Error { message: e.message };
600                    return;
601                }
602            };
603
604            // Advance to new node in same tree
605            let new_head = old_head.advance(node_id);
606
607            match storage.cone_update_head(&cone_id, node_id).await {
608                Ok(()) => {
609                    yield SetHeadResult::Updated {
610                        cone_id,
611                        old_head,
612                        new_head,
613                    };
614                }
615                Err(e) => {
616                    yield SetHeadResult::Error { message: e.message };
617                }
618            }
619        }
620    }
621
622    /// Get available LLM services and models
623    #[plexus_macros::hub_method]
624    async fn registry(&self) -> impl Stream<Item = RegistryResult> + Send + 'static {
625        let llm_registry = self.llm_registry.clone();
626
627        stream! {
628            let export = llm_registry.export();
629            yield RegistryResult::Registry(export);
630        }
631    }
632}
633
634/// Resolve arbor context path to cllient messages by resolving handles
635async fn resolve_context_to_messages(
636    storage: &ConeStorage,
637    nodes: &[Node],
638    _system_prompt: &Option<String>,
639) -> Result<Vec<Message>, String> {
640    let mut messages = Vec::new();
641
642    for node in nodes {
643        match &node.data {
644            NodeType::Text { content } => {
645                // Text nodes shouldn't exist in the new design, but handle gracefully
646                // Skip empty root nodes
647                if !content.is_empty() {
648                    messages.push(Message::user(content));
649                }
650            }
651            NodeType::External { handle } => {
652                // Resolve handle based on plugin_id
653                // Use Cone::<NoParent> to access the const (same for all P)
654                if handle.plugin_id == Cone::<NoParent>::PLUGIN_ID {
655                    // Resolve cone message handle - format: "msg-{uuid}:{role}:{name}"
656                    let identifier = handle.meta.join(":");
657                    let msg = storage
658                        .resolve_message_handle(&identifier)
659                        .await
660                        .map_err(|e| format!("Failed to resolve message handle: {}", e.message))?;
661
662                    let cllient_msg = match msg.role {
663                        MessageRole::User => Message::user(&msg.content),
664                        MessageRole::Assistant => Message::assistant(&msg.content),
665                        MessageRole::System => Message::system(&msg.content),
666                    };
667                    messages.push(cllient_msg);
668                } else if handle.plugin_id == Bash::PLUGIN_ID {
669                    // TODO: Resolve bash output when bash plugin integration is added
670                    let cmd_id = handle.meta.first().map(|s| s.as_str()).unwrap_or("unknown");
671                    messages.push(Message::user(&format!(
672                        "[Tool output from bash: {}]",
673                        cmd_id
674                    )));
675                } else {
676                    // Unknown handle plugin - include as reference using Display
677                    messages.push(Message::user(&format!(
678                        "[External reference: {}]",
679                        handle
680                    )));
681                }
682            }
683        }
684    }
685
686    Ok(messages)
687}