ruchy/
actors.rs

1//! MCP-compatible Actor system implementation
2//!
3//! Based on SPECIFICATION.md section 7: MCP Message-Passing Architecture
4
5use anyhow::Result;
6use serde::{Deserialize, Serialize};
7use std::fmt;
8use std::time::Duration;
9use tokio::sync::mpsc;
10use tracing;
11
12/// Core Actor trait compatible with MCP message passing
13#[async_trait::async_trait]
14pub trait Actor: Send + Sync + 'static + Sized {
15    type Message: McpSerializable + Send + 'static;
16    type Response: McpSerializable + Send + 'static;
17
18    async fn receive(&mut self, msg: Self::Message) -> Option<Self::Response>;
19
20    /// Spawn the actor and return a handle to communicate with it
21    fn spawn(mut self) -> ActorHandle<Self::Message, Self::Response> {
22        let (tx, mut rx) = mpsc::channel::<(Self::Message, mpsc::Sender<Self::Response>)>(100);
23
24        tokio::spawn(async move {
25            while let Some((msg, reply_tx)) = rx.recv().await {
26                let response = self.receive(msg).await;
27                if let Some(resp) = response {
28                    let _ = reply_tx.send(resp).await;
29                }
30            }
31        });
32
33        ActorHandle { tx }
34    }
35}
36
37/// Trait for MCP serializable messages
38pub trait McpSerializable: Serialize + for<'de> Deserialize<'de> + fmt::Debug + Clone {}
39
40// Blanket implementation for types that satisfy the bounds
41impl<T> McpSerializable for T where T: Serialize + for<'de> Deserialize<'de> + fmt::Debug + Clone {}
42
43/// Handle for communicating with an actor
44pub struct ActorHandle<M, R> {
45    tx: mpsc::Sender<(M, mpsc::Sender<R>)>,
46}
47
48impl<M, R> ActorHandle<M, R>
49where
50    M: McpSerializable + Send + 'static,
51    R: McpSerializable + Send + 'static,
52{
53    /// Send a message to the actor without waiting for response
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the actor has stopped and can no longer receive messages
58    pub async fn send(&self, msg: M) -> Result<()> {
59        let (reply_tx, _) = mpsc::channel::<R>(1);
60        self.tx
61            .send((msg, reply_tx))
62            .await
63            .map_err(|_| anyhow::anyhow!("Actor has stopped"))?;
64        Ok(())
65    }
66
67    /// Send a message and wait for response
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the actor has stopped or does not respond
72    pub async fn ask(&self, msg: M) -> Result<R> {
73        let (reply_tx, mut reply_rx) = mpsc::channel::<R>(1);
74
75        self.tx
76            .send((msg, reply_tx))
77            .await
78            .map_err(|_| anyhow::anyhow!("Actor has stopped"))?;
79
80        reply_rx
81            .recv()
82            .await
83            .ok_or_else(|| anyhow::anyhow!("No response received"))
84    }
85
86    /// Check if the actor is still alive
87    pub fn is_alive(&self) -> bool {
88        !self.tx.is_closed()
89    }
90}
91
92/// MCP protocol message structure
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct McpMessage {
95    pub jsonrpc: String,
96    pub method: String,
97    pub params: serde_json::Value,
98    pub id: Option<String>,
99}
100
101/// MCP protocol response structure
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct McpResponse {
104    pub jsonrpc: String,
105    pub result: Option<serde_json::Value>,
106    pub error: Option<McpError>,
107    pub id: Option<String>,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct McpError {
112    pub code: i32,
113    pub message: String,
114    pub data: Option<serde_json::Value>,
115}
116
117/// MCP-compatible actor for handling protocol messages
118pub struct McpActor {
119    pub tools: Vec<String>,
120}
121
122impl McpActor {
123    /// Create a new MCP actor with default tools
124    ///
125    /// # Examples
126    ///
127    /// ```
128    /// use ruchy::McpActor;
129    ///
130    /// let actor = McpActor::new();
131    /// // Actor starts with three default tools
132    /// assert_eq!(actor.tools.len(), 3);
133    /// ```
134    pub fn new() -> Self {
135        Self {
136            tools: vec![
137                "transpile".to_string(),
138                "parse".to_string(),
139                "analyze".to_string(),
140            ],
141        }
142    }
143
144    fn list_tools(&self) -> McpResponse {
145        McpResponse {
146            jsonrpc: "2.0".to_string(),
147            result: Some(serde_json::json!({
148                "tools": self.tools.iter().map(|name| {
149                    serde_json::json!({
150                        "name": name,
151                        "description": format!("Ruchy {name} tool")
152                    })
153                }).collect::<Vec<_>>()
154            })),
155            error: None,
156            id: None,
157        }
158    }
159
160    fn call_tool(params: &serde_json::Value) -> Option<McpResponse> {
161        // Extract tool name and arguments from params
162        let tool_name = params.get("name")?.as_str()?;
163
164        let result = match tool_name {
165            "transpile" => {
166                serde_json::json!({
167                    "content": [
168                        {
169                            "type": "text",
170                            "text": "Transpilation functionality placeholder"
171                        }
172                    ]
173                })
174            }
175            "parse" => {
176                serde_json::json!({
177                    "content": [
178                        {
179                            "type": "text",
180                            "text": "Parsing functionality placeholder"
181                        }
182                    ]
183                })
184            }
185            "analyze" => {
186                serde_json::json!({
187                    "content": [
188                        {
189                            "type": "text",
190                            "text": "Analysis functionality placeholder"
191                        }
192                    ]
193                })
194            }
195            _ => {
196                return Some(McpResponse {
197                    jsonrpc: "2.0".to_string(),
198                    result: None,
199                    error: Some(McpError {
200                        code: -32601,
201                        message: format!("Unknown tool: {tool_name}"),
202                        data: None,
203                    }),
204                    id: None,
205                });
206            }
207        };
208
209        Some(McpResponse {
210            jsonrpc: "2.0".to_string(),
211            result: Some(result),
212            error: None,
213            id: None,
214        })
215    }
216}
217
218impl Default for McpActor {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224#[async_trait::async_trait]
225impl Actor for McpActor {
226    type Message = McpMessage;
227    type Response = McpResponse;
228
229    async fn receive(&mut self, msg: McpMessage) -> Option<McpResponse> {
230        match msg.method.as_str() {
231            "tools/list" => Some(self.list_tools()),
232            "tools/call" => Self::call_tool(&msg.params),
233            _ => Some(McpResponse {
234                jsonrpc: "2.0".to_string(),
235                result: None,
236                error: Some(McpError {
237                    code: -32601,
238                    message: format!("Unknown method: {method}", method = msg.method),
239                    data: None,
240                }),
241                id: msg.id,
242            }),
243        }
244    }
245}
246
247/// Supervision strategies for actor fault tolerance
248#[derive(Debug, Clone)]
249pub enum SupervisionStrategy {
250    /// Restart only the failed child
251    OneForOne,
252    /// Restart all children when one fails
253    OneForAll,
254    /// Restart the failed child and all children started after it
255    RestForOne,
256}
257
258/// Supervisor for managing actor lifecycles
259pub struct Supervisor<A: Actor> {
260    children: Vec<ActorHandle<A::Message, A::Response>>,
261    strategy: SupervisionStrategy,
262}
263
264impl<A: Actor> Supervisor<A> {
265    pub fn new(strategy: SupervisionStrategy) -> Self {
266        Self {
267            children: Vec::new(),
268            strategy,
269        }
270    }
271
272    pub fn supervise(&mut self, actor: A) {
273        let handle = actor.spawn();
274        self.children.push(handle);
275    }
276
277    pub async fn monitor(&mut self) {
278        // Monitoring implementation would go here
279        // For now, just check if actors are alive periodically
280        loop {
281            for (i, child) in self.children.iter().enumerate() {
282                if !child.is_alive() {
283                    match self.strategy {
284                        SupervisionStrategy::OneForOne => {
285                            tracing::warn!("Child actor {i} died, would restart in production");
286                        }
287                        SupervisionStrategy::OneForAll => {
288                            tracing::warn!("Child actor {i} died, would restart all in production");
289                        }
290                        SupervisionStrategy::RestForOne => {
291                            tracing::warn!(
292                                "Child actor {i} died, would restart from {i} in production"
293                            );
294                        }
295                    }
296                }
297            }
298
299            tokio::time::sleep(Duration::from_secs(1)).await;
300        }
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use tokio;
308
309    #[derive(Debug, Clone, Serialize, Deserialize)]
310    struct TestMessage {
311        content: String,
312    }
313
314    #[derive(Debug, Clone, Serialize, Deserialize)]
315    struct TestResponse {
316        echo: String,
317    }
318
319    struct EchoActor;
320
321    #[async_trait::async_trait]
322    impl Actor for EchoActor {
323        type Message = TestMessage;
324        type Response = TestResponse;
325
326        async fn receive(&mut self, msg: TestMessage) -> Option<TestResponse> {
327            Some(TestResponse {
328                echo: format!("Echo: {content}", content = msg.content),
329            })
330        }
331    }
332
333    #[tokio::test]
334    async fn test_actor_spawn_and_communication() -> Result<(), Box<dyn std::error::Error>> {
335        let actor = EchoActor;
336        let handle = actor.spawn();
337
338        let msg = TestMessage {
339            content: "Hello, Actor!".to_string(),
340        };
341
342        let response = handle.ask(msg).await?;
343        assert_eq!(response.echo, "Echo: Hello, Actor!");
344        Ok(())
345    }
346
347    #[tokio::test]
348    async fn test_mcp_actor_list_tools() -> Result<(), Box<dyn std::error::Error>> {
349        let actor = McpActor::new();
350        let handle = actor.spawn();
351
352        let msg = McpMessage {
353            jsonrpc: "2.0".to_string(),
354            method: "tools/list".to_string(),
355            params: serde_json::Value::Null,
356            id: Some("test".to_string()),
357        };
358
359        let response = handle.ask(msg).await?;
360        assert!(response.result.is_some());
361        assert!(response.error.is_none());
362        Ok(())
363    }
364
365    #[tokio::test]
366    async fn test_mcp_actor_call_tool() -> Result<(), Box<dyn std::error::Error>> {
367        let actor = McpActor::new();
368        let handle = actor.spawn();
369
370        let msg = McpMessage {
371            jsonrpc: "2.0".to_string(),
372            method: "tools/call".to_string(),
373            params: serde_json::json!({
374                "name": "transpile",
375                "arguments": {}
376            }),
377            id: Some("test".to_string()),
378        };
379
380        let response = handle.ask(msg).await?;
381        assert!(response.result.is_some());
382        assert!(response.error.is_none());
383        Ok(())
384    }
385
386    #[test]
387    fn test_supervision_strategy_creation() {
388        let supervisor: Supervisor<EchoActor> = Supervisor::new(SupervisionStrategy::OneForOne);
389        assert!(matches!(
390            supervisor.strategy,
391            SupervisionStrategy::OneForOne
392        ));
393        assert_eq!(supervisor.children.len(), 0);
394    }
395
396    #[test]
397    fn test_mcp_message_serialization() -> Result<(), Box<dyn std::error::Error>> {
398        let msg = McpMessage {
399            jsonrpc: "2.0".to_string(),
400            method: "test".to_string(),
401            params: serde_json::json!({"key": "value"}),
402            id: Some("123".to_string()),
403        };
404
405        let serialized = serde_json::to_string(&msg)?;
406        let deserialized: McpMessage = serde_json::from_str(&serialized)?;
407
408        assert_eq!(msg.jsonrpc, deserialized.jsonrpc);
409        assert_eq!(msg.method, deserialized.method);
410        assert_eq!(msg.id, deserialized.id);
411        Ok(())
412    }
413}