agent_stream_kit/
stream.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4use crate::askit::ASKit;
5use crate::error::AgentError;
6use crate::id::{new_id, update_ids};
7use crate::spec::AgentStreamSpec;
8use crate::{AgentSpec, ChannelSpec, FnvIndexMap};
9
10pub type AgentStreams = FnvIndexMap<String, AgentStream>;
11
12pub struct AgentStream {
13    id: String,
14
15    name: String,
16
17    running: bool,
18
19    spec: AgentStreamSpec,
20}
21
22impl AgentStream {
23    /// Create a new agent stream with the given name and spec.
24    ///
25    /// The ids of the given spec, including agents and channels, are changed to new unique ids.
26    pub fn new(name: String, mut spec: AgentStreamSpec) -> Self {
27        let (agents, channels) = update_ids(&spec.agents, &spec.channels);
28        spec.agents = agents;
29        spec.channels = channels;
30
31        Self {
32            id: new_id(),
33            name,
34            running: false,
35            spec,
36        }
37    }
38
39    pub fn id(&self) -> &str {
40        &self.id
41    }
42
43    pub fn spec(&self) -> &AgentStreamSpec {
44        &self.spec
45    }
46
47    pub fn update_spec(&mut self, value: &Value) -> Result<(), AgentError> {
48        let update_map = value
49            .as_object()
50            .ok_or_else(|| AgentError::SerializationError("Expected JSON object".to_string()))?;
51
52        for (k, v) in update_map {
53            match k.as_str() {
54                "agents" => {
55                    // just ignore
56                }
57                "channels" => {
58                    // just ignore
59                }
60                _ => {
61                    // Update extensions
62                    self.spec.extensions.insert(k.clone(), v.clone());
63                }
64            }
65        }
66        Ok(())
67    }
68
69    pub fn name(&self) -> &str {
70        &self.name
71    }
72
73    pub fn set_name(&mut self, name: String) {
74        self.name = name;
75    }
76
77    pub fn add_agent(&mut self, agent: AgentSpec) {
78        self.spec.add_agent(agent);
79    }
80
81    pub fn remove_agent(&mut self, agent_id: &str) {
82        self.spec.remove_agent(agent_id);
83    }
84
85    pub fn add_channel(&mut self, channel: ChannelSpec) {
86        self.spec.add_channel(channel);
87    }
88
89    pub fn remove_channel(&mut self, channel: &ChannelSpec) -> Option<ChannelSpec> {
90        self.spec.remove_channel(channel)
91    }
92
93    pub fn running(&self) -> bool {
94        self.running
95    }
96
97    pub async fn start(&mut self, askit: &ASKit) -> Result<(), AgentError> {
98        if self.running {
99            // Already running
100            return Ok(());
101        }
102        self.running = true;
103
104        for agent in self.spec.agents.iter() {
105            if agent.disabled {
106                continue;
107            }
108            askit.start_agent(&agent.id).await.unwrap_or_else(|e| {
109                log::error!("Failed to start agent {}: {}", agent.id, e);
110            });
111        }
112
113        Ok(())
114    }
115
116    pub async fn stop(&mut self, askit: &ASKit) -> Result<(), AgentError> {
117        for agent in self.spec.agents.iter() {
118            askit.stop_agent(&agent.id).await.unwrap_or_else(|e| {
119                log::error!("Failed to stop agent {}: {}", agent.id, e);
120            });
121        }
122        self.running = false;
123        Ok(())
124    }
125}
126
127#[derive(Clone, Debug, Serialize, Deserialize)]
128pub struct AgentStreamInfo {
129    pub id: String,
130    pub name: String,
131    pub running: bool,
132}
133
134impl From<&AgentStream> for AgentStreamInfo {
135    fn from(stream: &AgentStream) -> Self {
136        Self {
137            id: stream.id.clone(),
138            name: stream.name.clone(),
139            running: stream.running,
140        }
141    }
142}