Skip to main content

vm_pool_protocol/
lib.rs

1//! Shared command and event type definitions for vm-pool.
2//!
3//! This crate defines the protocol used for communication between:
4//! - Host (service) ↔ VM (supervisor) over stdio
5//! - Tasks (client) ↔ vm-pool (service) over Unix socket
6
7use std::fmt;
8
9use serde::{Deserialize, Serialize};
10
11/// Strongly-typed VM identifier.
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13#[serde(transparent)]
14pub struct VmId(String);
15
16impl VmId {
17    pub fn new(id: impl Into<String>) -> Self {
18        Self(id.into())
19    }
20
21    pub fn as_str(&self) -> &str {
22        &self.0
23    }
24}
25
26impl fmt::Display for VmId {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        f.write_str(&self.0)
29    }
30}
31
32impl From<String> for VmId {
33    fn from(s: String) -> Self {
34        Self(s)
35    }
36}
37
38impl From<&str> for VmId {
39    fn from(s: &str) -> Self {
40        Self(s.to_owned())
41    }
42}
43
44/// Commands sent from host to supervisor (inside VM).
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
46#[serde(tag = "type", rename_all = "snake_case")]
47pub enum VmCommand {
48    /// Execute a shell command.
49    Execute { command: String },
50    /// Graceful shutdown.
51    Shutdown,
52    /// Health check ping.
53    Ping,
54}
55
56/// Events emitted by supervisor to host.
57#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
58#[serde(tag = "type", rename_all = "snake_case")]
59pub enum VmEvent {
60    /// Supervisor is ready.
61    Ready,
62    /// Command output (stdout/stderr).
63    Output { stream: OutputStream, data: String },
64    /// Command completed.
65    CommandCompleted { exit_code: i32 },
66    /// Pong response to ping.
67    Pong,
68    /// Supervisor is shutting down.
69    Shutdown,
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
73#[serde(rename_all = "snake_case")]
74pub enum OutputStream {
75    Stdout,
76    Stderr,
77}
78
79/// Stream type for log output.
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(rename_all = "snake_case")]
82pub enum LogStream {
83    Stdout,
84    Stderr,
85    Supervisor,
86}
87
88/// A single log line with metadata.
89#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
90pub struct LogLine {
91    pub stream: LogStream,
92    pub line: String,
93    pub timestamp: u64,
94}
95
96/// Priority level for VM allocation. Higher priority VMs can evict
97/// lower priority ones when the pool is full.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
99#[serde(rename_all = "snake_case")]
100pub enum Priority {
101    /// Background/batch work. First to be evicted.
102    Low = 0,
103    /// Normal interactive work.
104    Normal = 1,
105    /// Urgent work. Can evict Low and Normal.
106    High = 2,
107    /// Critical work. Can evict anything below.
108    Critical = 3,
109}
110
111impl Default for Priority {
112    fn default() -> Self {
113        Priority::Normal
114    }
115}
116
117impl fmt::Display for Priority {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        match self {
120            Priority::Low => f.write_str("low"),
121            Priority::Normal => f.write_str("normal"),
122            Priority::High => f.write_str("high"),
123            Priority::Critical => f.write_str("critical"),
124        }
125    }
126}
127
128/// Configuration for a VM.
129#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
130pub struct VmConfig {
131    /// CPU cores (default: 2).
132    #[serde(default)]
133    pub cpus: Option<u32>,
134    /// Memory in MB (default: 2048).
135    #[serde(default)]
136    pub memory_mb: Option<u32>,
137    /// Priority level for pool eviction.
138    #[serde(default)]
139    pub priority: Priority,
140    /// Environment variables to set.
141    #[serde(default)]
142    pub env: Vec<(String, String)>,
143}
144
145/// Commands sent from Tasks to vm-pool service.
146#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
147#[serde(tag = "type", rename_all = "snake_case")]
148pub enum ServiceCommand {
149    /// Allocate a new VM from the pool.
150    Allocate { image: String, config: VmConfig },
151    /// Deallocate a VM back to the pool.
152    Deallocate { vm_id: VmId },
153    /// Send a command to a VM.
154    Send { vm_id: VmId, command: VmCommand },
155    /// Save VM state to a snapshot.
156    Snapshot { vm_id: VmId, name: String },
157    /// Restore VM from a snapshot.
158    Restore { vm_id: VmId, snapshot: String },
159    /// Get pool status.
160    Status,
161    /// Get last N log lines from a VM.
162    TailLogs { vm_id: VmId, lines: usize },
163    /// Subscribe to real-time logs from a VM (or all VMs if None).
164    SubscribeLogs { vm_id: Option<VmId> },
165    /// Unsubscribe from log streaming.
166    UnsubscribeLogs,
167}
168
169/// Events emitted by vm-pool service to Tasks.
170#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
171#[serde(tag = "type", rename_all = "snake_case")]
172pub enum ServiceEvent {
173    /// VM was allocated.
174    VmAllocated { vm_id: VmId, image: String },
175    /// VM started and supervisor is ready.
176    VmReady { vm_id: VmId },
177    /// Event forwarded from VM supervisor.
178    VmEvent { vm_id: VmId, event: VmEvent },
179    /// VM stopped (graceful).
180    VmStopped { vm_id: VmId },
181    /// VM crashed or was killed.
182    VmCrashed { vm_id: VmId, error: String },
183    /// Pool status response.
184    PoolStatus {
185        total: usize,
186        available: usize,
187        allocated: usize,
188    },
189    /// Log line from a VM (streamed).
190    VmLog {
191        vm_id: VmId,
192        stream: LogStream,
193        line: String,
194    },
195    /// Response to TailLogs command.
196    LogTail { vm_id: VmId, lines: Vec<LogLine> },
197    /// Acknowledgment of log subscription.
198    LogsSubscribed { vm_id: Option<VmId> },
199    /// An error occurred processing a command.
200    Error { message: String },
201}
202
203/// Encode a value as a JSON line (no embedded newlines, terminated by \n).
204pub fn encode_json_line<T: Serialize>(value: &T) -> Result<String, serde_json::Error> {
205    let mut json = serde_json::to_string(value)?;
206    json.push('\n');
207    Ok(json)
208}
209
210/// Decode a JSON line.
211pub fn decode_json_line<'a, T: Deserialize<'a>>(line: &'a str) -> Result<T, serde_json::Error> {
212    serde_json::from_str(line.trim())
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn vm_id_display() {
221        let id = VmId::new("vm-abc123");
222        assert_eq!(id.to_string(), "vm-abc123");
223        assert_eq!(id.as_str(), "vm-abc123");
224    }
225
226    #[test]
227    fn vm_id_serde_transparent() {
228        let id = VmId::new("vm-abc123");
229        let json = serde_json::to_string(&id).unwrap();
230        assert_eq!(json, "\"vm-abc123\"");
231        let parsed: VmId = serde_json::from_str(&json).unwrap();
232        assert_eq!(parsed, id);
233    }
234
235    #[test]
236    fn vm_id_equality_and_hash() {
237        use std::collections::HashSet;
238        let a = VmId::new("vm-1");
239        let b = VmId::from("vm-1".to_string());
240        let c: VmId = "vm-1".into();
241        assert_eq!(a, b);
242        assert_eq!(b, c);
243        let mut set = HashSet::new();
244        set.insert(a);
245        assert!(set.contains(&b));
246    }
247
248    #[test]
249    fn vm_command_execute_roundtrip() {
250        let cmd = VmCommand::Execute {
251            command: "ls -la".into(),
252        };
253        let json = serde_json::to_string(&cmd).unwrap();
254        assert!(json.contains("\"type\":\"execute\""));
255        let parsed: VmCommand = serde_json::from_str(&json).unwrap();
256        assert_eq!(parsed, cmd);
257    }
258
259    #[test]
260    fn vm_command_shutdown_roundtrip() {
261        let cmd = VmCommand::Shutdown;
262        let json = serde_json::to_string(&cmd).unwrap();
263        assert_eq!(json, "{\"type\":\"shutdown\"}");
264        let parsed: VmCommand = serde_json::from_str(&json).unwrap();
265        assert_eq!(parsed, cmd);
266    }
267
268    #[test]
269    fn vm_command_ping_roundtrip() {
270        let cmd = VmCommand::Ping;
271        let json = serde_json::to_string(&cmd).unwrap();
272        assert_eq!(json, "{\"type\":\"ping\"}");
273        let parsed: VmCommand = serde_json::from_str(&json).unwrap();
274        assert_eq!(parsed, cmd);
275    }
276
277    #[test]
278    fn vm_event_ready_roundtrip() {
279        let event = VmEvent::Ready;
280        let json = serde_json::to_string(&event).unwrap();
281        assert_eq!(json, "{\"type\":\"ready\"}");
282        let parsed: VmEvent = serde_json::from_str(&json).unwrap();
283        assert_eq!(parsed, event);
284    }
285
286    #[test]
287    fn vm_event_output_roundtrip() {
288        let event = VmEvent::Output {
289            stream: OutputStream::Stdout,
290            data: "hello world\n".into(),
291        };
292        let json = serde_json::to_string(&event).unwrap();
293        let parsed: VmEvent = serde_json::from_str(&json).unwrap();
294        assert_eq!(parsed, event);
295    }
296
297    #[test]
298    fn vm_event_command_completed_roundtrip() {
299        let event = VmEvent::CommandCompleted { exit_code: 42 };
300        let json = serde_json::to_string(&event).unwrap();
301        let parsed: VmEvent = serde_json::from_str(&json).unwrap();
302        assert_eq!(parsed, event);
303    }
304
305    #[test]
306    fn service_command_allocate_roundtrip() {
307        let cmd = ServiceCommand::Allocate {
308            image: "agent:v1.0.0".into(),
309            config: VmConfig {
310                cpus: Some(2),
311                memory_mb: Some(4096),
312                priority: Priority::High,
313                env: vec![("KEY".into(), "VALUE".into())],
314            },
315        };
316        let json = serde_json::to_string(&cmd).unwrap();
317        let parsed: ServiceCommand = serde_json::from_str(&json).unwrap();
318        assert_eq!(parsed, cmd);
319    }
320
321    #[test]
322    fn service_command_status_roundtrip() {
323        let cmd = ServiceCommand::Status;
324        let json = serde_json::to_string(&cmd).unwrap();
325        assert_eq!(json, "{\"type\":\"status\"}");
326        let parsed: ServiceCommand = serde_json::from_str(&json).unwrap();
327        assert_eq!(parsed, cmd);
328    }
329
330    #[test]
331    fn service_command_send_roundtrip() {
332        let cmd = ServiceCommand::Send {
333            vm_id: VmId::new("vm-abc"),
334            command: VmCommand::Execute {
335                command: "echo hi".into(),
336            },
337        };
338        let json = serde_json::to_string(&cmd).unwrap();
339        let parsed: ServiceCommand = serde_json::from_str(&json).unwrap();
340        assert_eq!(parsed, cmd);
341    }
342
343    #[test]
344    fn service_event_error_roundtrip() {
345        let event = ServiceEvent::Error {
346            message: "pool exhausted".into(),
347        };
348        let json = serde_json::to_string(&event).unwrap();
349        assert!(json.contains("\"type\":\"error\""));
350        let parsed: ServiceEvent = serde_json::from_str(&json).unwrap();
351        assert_eq!(parsed, event);
352    }
353
354    #[test]
355    fn service_event_pool_status_roundtrip() {
356        let event = ServiceEvent::PoolStatus {
357            total: 6,
358            available: 4,
359            allocated: 2,
360        };
361        let json = serde_json::to_string(&event).unwrap();
362        let parsed: ServiceEvent = serde_json::from_str(&json).unwrap();
363        assert_eq!(parsed, event);
364    }
365
366    #[test]
367    fn service_event_log_tail_roundtrip() {
368        let event = ServiceEvent::LogTail {
369            vm_id: VmId::new("vm-1"),
370            lines: vec![
371                LogLine {
372                    stream: LogStream::Stdout,
373                    line: "output line".into(),
374                    timestamp: 1234567890,
375                },
376                LogLine {
377                    stream: LogStream::Stderr,
378                    line: "error line".into(),
379                    timestamp: 1234567891,
380                },
381            ],
382        };
383        let json = serde_json::to_string(&event).unwrap();
384        let parsed: ServiceEvent = serde_json::from_str(&json).unwrap();
385        assert_eq!(parsed, event);
386    }
387
388    #[test]
389    fn vm_config_defaults() {
390        let config = VmConfig::default();
391        assert_eq!(config.cpus, None);
392        assert_eq!(config.memory_mb, None);
393        assert!(config.env.is_empty());
394    }
395
396    #[test]
397    fn vm_config_missing_fields_deserialize() {
398        let json = "{}";
399        let config: VmConfig = serde_json::from_str(json).unwrap();
400        assert_eq!(config, VmConfig::default());
401    }
402
403    #[test]
404    fn encode_decode_json_line() {
405        let cmd = VmCommand::Ping;
406        let line = encode_json_line(&cmd).unwrap();
407        assert!(line.ends_with('\n'));
408        assert!(!line[..line.len() - 1].contains('\n'));
409        let parsed: VmCommand = decode_json_line(&line).unwrap();
410        assert_eq!(parsed, cmd);
411    }
412
413    #[test]
414    fn log_stream_variants() {
415        let streams = [LogStream::Stdout, LogStream::Stderr, LogStream::Supervisor];
416        for stream in streams {
417            let json = serde_json::to_string(&stream).unwrap();
418            let parsed: LogStream = serde_json::from_str(&json).unwrap();
419            assert_eq!(parsed, stream);
420        }
421    }
422
423    #[test]
424    fn output_stream_variants() {
425        let streams = [OutputStream::Stdout, OutputStream::Stderr];
426        for stream in streams {
427            let json = serde_json::to_string(&stream).unwrap();
428            let parsed: OutputStream = serde_json::from_str(&json).unwrap();
429            assert_eq!(parsed, stream);
430        }
431    }
432
433    #[test]
434    fn service_command_subscribe_logs_with_vm_id() {
435        let cmd = ServiceCommand::SubscribeLogs {
436            vm_id: Some(VmId::new("vm-1")),
437        };
438        let json = serde_json::to_string(&cmd).unwrap();
439        let parsed: ServiceCommand = serde_json::from_str(&json).unwrap();
440        assert_eq!(parsed, cmd);
441    }
442
443    #[test]
444    fn service_command_subscribe_logs_all() {
445        let cmd = ServiceCommand::SubscribeLogs { vm_id: None };
446        let json = serde_json::to_string(&cmd).unwrap();
447        let parsed: ServiceCommand = serde_json::from_str(&json).unwrap();
448        assert_eq!(parsed, cmd);
449    }
450}