Skip to main content

ralph_api/
collection_domain.rs

1mod yaml;
2
3use std::collections::BTreeMap;
4use std::fs;
5use std::path::{Path, PathBuf};
6
7use chrono::Utc;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use tracing::warn;
11
12use crate::errors::ApiError;
13use crate::loop_support::now_ts;
14
15use self::yaml::{export_collection_yaml, graph_from_yaml};
16
17#[derive(Debug, Clone, Deserialize)]
18#[serde(rename_all = "camelCase")]
19pub struct CollectionCreateParams {
20    pub name: String,
21    pub description: Option<String>,
22    pub graph: Option<Value>,
23}
24
25#[derive(Debug, Clone, Deserialize)]
26#[serde(rename_all = "camelCase")]
27pub struct CollectionUpdateParams {
28    pub id: String,
29    pub name: Option<String>,
30    pub description: Option<String>,
31    pub graph: Option<Value>,
32}
33
34#[derive(Debug, Clone, Deserialize)]
35#[serde(rename_all = "camelCase")]
36pub struct CollectionImportParams {
37    pub yaml: String,
38    pub name: String,
39    pub description: Option<String>,
40}
41
42#[derive(Debug, Clone, Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct CollectionRunParams {
45    pub id: String,
46    pub prompt: String,
47}
48
49#[derive(Debug, Clone, Serialize)]
50#[serde(rename_all = "camelCase")]
51pub struct CollectionRunResult {
52    pub success: bool,
53    pub config_path: String,
54    pub pid: u32,
55    /// The hat that will activate first, derived from the graph topology.
56    /// The frontend uses this to highlight the entry node immediately
57    /// without waiting for the first WebSocket event (timing-race fix).
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub starting_hat: Option<String>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(rename_all = "camelCase")]
64pub struct CollectionSummary {
65    pub id: String,
66    pub name: String,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub description: Option<String>,
69    pub created_at: String,
70    pub updated_at: String,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74#[serde(rename_all = "camelCase")]
75pub struct CollectionRecord {
76    pub id: String,
77    pub name: String,
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub description: Option<String>,
80    pub graph: GraphData,
81    pub created_at: String,
82    pub updated_at: String,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86#[serde(rename_all = "camelCase")]
87pub struct GraphData {
88    pub nodes: Vec<GraphNode>,
89    pub edges: Vec<GraphEdge>,
90    pub viewport: Viewport,
91}
92
93impl Default for GraphData {
94    fn default() -> Self {
95        Self {
96            nodes: Vec::new(),
97            edges: Vec::new(),
98            viewport: Viewport {
99                x: 0.0,
100                y: 0.0,
101                zoom: 1.0,
102            },
103        }
104    }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108#[serde(rename_all = "camelCase")]
109pub struct GraphNode {
110    pub id: String,
111    #[serde(rename = "type")]
112    pub node_type: String,
113    pub position: NodePosition,
114    pub data: HatNodeData,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "camelCase")]
119pub struct NodePosition {
120    pub x: f64,
121    pub y: f64,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "camelCase")]
126pub struct HatNodeData {
127    pub key: String,
128    pub name: String,
129    pub description: String,
130    pub triggers_on: Vec<String>,
131    pub publishes: Vec<String>,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub instructions: Option<String>,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137#[serde(rename_all = "camelCase")]
138pub struct GraphEdge {
139    pub id: String,
140    pub source: String,
141    pub target: String,
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub source_handle: Option<String>,
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub target_handle: Option<String>,
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub label: Option<String>,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151#[serde(rename_all = "camelCase")]
152pub struct Viewport {
153    pub x: f64,
154    pub y: f64,
155    pub zoom: f64,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, Default)]
159#[serde(rename_all = "camelCase")]
160struct CollectionSnapshot {
161    collections: Vec<CollectionRecord>,
162    id_counter: u64,
163}
164
165pub struct CollectionDomain {
166    store_path: PathBuf,
167    collections: BTreeMap<String, CollectionRecord>,
168    id_counter: u64,
169}
170
171impl CollectionDomain {
172    pub fn new(workspace_root: impl AsRef<Path>) -> Self {
173        let store_path = workspace_root
174            .as_ref()
175            .join(".ralph/api/collections-v1.json");
176        let mut domain = Self {
177            store_path,
178            collections: BTreeMap::new(),
179            id_counter: 0,
180        };
181        domain.load();
182        domain
183    }
184
185    pub fn list(&self) -> Vec<CollectionSummary> {
186        let mut entries: Vec<_> = self
187            .collections
188            .values()
189            .map(|collection| CollectionSummary {
190                id: collection.id.clone(),
191                name: collection.name.clone(),
192                description: collection.description.clone(),
193                created_at: collection.created_at.clone(),
194                updated_at: collection.updated_at.clone(),
195            })
196            .collect();
197
198        entries.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
199        entries
200    }
201
202    pub fn get(&self, id: &str) -> Result<CollectionRecord, ApiError> {
203        self.collections
204            .get(id)
205            .cloned()
206            .ok_or_else(|| collection_not_found_error(id))
207    }
208
209    pub fn create(&mut self, params: CollectionCreateParams) -> Result<CollectionRecord, ApiError> {
210        if params.name.trim().is_empty() {
211            return Err(ApiError::invalid_params(
212                "collection name must not be empty",
213            ));
214        }
215
216        let graph = params
217            .graph
218            .map(parse_graph)
219            .transpose()?
220            .unwrap_or_default();
221
222        let now = now_ts();
223        let id = self.next_collection_id();
224
225        let record = CollectionRecord {
226            id: id.clone(),
227            name: params.name,
228            description: params.description,
229            graph,
230            created_at: now.clone(),
231            updated_at: now,
232        };
233
234        self.collections.insert(id.clone(), record);
235        self.persist()?;
236        self.get(&id)
237    }
238
239    pub fn update(&mut self, params: CollectionUpdateParams) -> Result<CollectionRecord, ApiError> {
240        let record = self
241            .collections
242            .get_mut(&params.id)
243            .ok_or_else(|| collection_not_found_error(&params.id))?;
244
245        if let Some(ref name) = params.name {
246            if name.trim().is_empty() {
247                return Err(ApiError::invalid_params(
248                    "collection name must not be empty",
249                ));
250            }
251            record.name = name.clone();
252        }
253
254        if let Some(description) = params.description {
255            record.description = Some(description);
256        }
257
258        if let Some(graph) = params.graph {
259            record.graph = parse_graph(graph)?;
260        }
261
262        record.updated_at = now_ts();
263        self.persist()?;
264        self.get(&params.id)
265    }
266
267    pub fn delete(&mut self, id: &str) -> Result<(), ApiError> {
268        if self.collections.remove(id).is_none() {
269            return Err(collection_not_found_error(id));
270        }
271
272        self.persist()
273    }
274
275    pub fn import(&mut self, params: CollectionImportParams) -> Result<CollectionRecord, ApiError> {
276        let graph = graph_from_yaml(&params.yaml)?;
277        self.create(CollectionCreateParams {
278            name: params.name,
279            description: params.description,
280            graph: Some(serde_json::to_value(graph).map_err(|error| {
281                ApiError::internal(format!("failed serializing graph: {error}"))
282            })?),
283        })
284    }
285
286    pub fn export(&self, id: &str) -> Result<String, ApiError> {
287        let collection = self.get(id)?;
288        export_collection_yaml(&collection)
289    }
290
291    /// Export the collection's hats to a temp YAML file and spawn `ralph run`
292    /// with it via the `-H` flag. The user's existing `ralph.yml` provides
293    /// core config (backend, max_iterations, backpressure). The collection
294    /// only provides hats and events.
295    ///
296    /// Returns the PID so the frontend can track the process.
297    pub fn run(
298        &self,
299        params: CollectionRunParams,
300        ralph_command: &str,
301        workspace_root: &Path,
302    ) -> Result<CollectionRunResult, ApiError> {
303        let yaml = self.export(&params.id)?;
304
305        // Write the exported YAML to a predictable path.
306        let collections_dir = workspace_root.join(".ralph/collections");
307        fs::create_dir_all(&collections_dir).map_err(|error| {
308            ApiError::internal(format!(
309                "failed creating collections run directory: {error}"
310            ))
311        })?;
312
313        let config_path = collections_dir.join(format!("{}-run.yml", params.id));
314        fs::write(&config_path, &yaml).map_err(|error| {
315            ApiError::internal(format!(
316                "failed writing collection run config '{}': {error}",
317                config_path.display()
318            ))
319        })?;
320
321        // Spawn ralph run with -H (hats overlay) so the user's ralph.yml
322        // provides backend/max_iterations/backpressure and the collection
323        // provides hats/events. -a (autonomous) forces headless mode, which
324        // is required when the API spawns ralph: interactive mode tries to
325        // read from a tty the background process doesn't own and gets
326        // SIGSTOP'd by the OS. Autonomous mode implies --no-tui.
327        let child = std::process::Command::new(ralph_command)
328            .current_dir(workspace_root)
329            .args([
330                "run",
331                "-H",
332                &config_path.to_string_lossy(),
333                "-a",
334                "-p",
335                &params.prompt,
336            ])
337            .stdout(std::process::Stdio::null())
338            .stderr(std::process::Stdio::piped())
339            .spawn()
340            .map_err(|error| {
341                ApiError::internal(format!(
342                    "ralph CLI not found or failed to start. Install ralph or set RALPH_API_RALPH_COMMAND. Error: {error}"
343                ))
344            })?;
345
346        let pid = child.id();
347
348        // Wait briefly to check if the process died immediately.
349        let mut child = child;
350        std::thread::sleep(std::time::Duration::from_millis(500));
351        match child.try_wait() {
352            Ok(Some(status)) if !status.success() => {
353                let mut stderr_output = String::new();
354                if let Some(mut stderr) = child.stderr.take() {
355                    use std::io::Read;
356                    let _ = stderr.read_to_string(&mut stderr_output);
357                }
358                let trimmed = stderr_output.trim();
359                // `status.code()` is `Some(code)` on normal exit; `None` means
360                // signal-terminated on Unix. We format both cleanly to avoid
361                // the double "exit status:" prefix that `ExitStatus: Display`
362                // would produce.
363                let status_label = match status.code() {
364                    Some(code) => format!("exit code {code}"),
365                    None => format!("{status}"),
366                };
367                let message = if trimmed.is_empty() {
368                    format!("ralph run exited with {status_label} (no stderr)")
369                } else {
370                    // Pass ralph's stderr through verbatim. Spawn-failure
371                    // output is small; truncation risks hiding the actual
372                    // error line.
373                    format!("ralph run exited with {status_label}:\n{trimmed}")
374                };
375                return Err(ApiError::internal(message));
376            }
377            _ => {
378                // Still running or exited successfully. Detach a reaper so
379                // the eventual exit doesn't leave a zombie process — the
380                // API may outlive many loop runs.
381                std::thread::spawn(move || {
382                    let _ = child.wait();
383                });
384            }
385        }
386
387        // Compute the starting hat from the collection's topology so the
388        // frontend can highlight it immediately (timing-race fix).
389        let collection = self.get(&params.id)?;
390        let starting_hat = yaml::starting_hat_for_collection(&collection);
391
392        Ok(CollectionRunResult {
393            success: true,
394            config_path: config_path.to_string_lossy().to_string(),
395            pid,
396            starting_hat,
397        })
398    }
399
400    fn next_collection_id(&mut self) -> String {
401        self.id_counter = self.id_counter.saturating_add(1);
402        format!(
403            "collection-{}-{:04x}",
404            Utc::now().timestamp_millis(),
405            self.id_counter
406        )
407    }
408
409    fn load(&mut self) {
410        if !self.store_path.exists() {
411            return;
412        }
413
414        let content = match fs::read_to_string(&self.store_path) {
415            Ok(content) => content,
416            Err(error) => {
417                warn!(
418                    path = %self.store_path.display(),
419                    %error,
420                    "failed reading collection snapshot"
421                );
422                return;
423            }
424        };
425
426        let snapshot: CollectionSnapshot = match serde_json::from_str(&content) {
427            Ok(snapshot) => snapshot,
428            Err(error) => {
429                warn!(
430                    path = %self.store_path.display(),
431                    %error,
432                    "failed parsing collection snapshot"
433                );
434                return;
435            }
436        };
437
438        self.collections = snapshot
439            .collections
440            .into_iter()
441            .map(|collection| (collection.id.clone(), collection))
442            .collect();
443        self.id_counter = snapshot.id_counter;
444    }
445
446    fn persist(&self) -> Result<(), ApiError> {
447        if let Some(parent) = self.store_path.parent() {
448            fs::create_dir_all(parent).map_err(|error| {
449                ApiError::internal(format!(
450                    "failed creating collection snapshot directory '{}': {error}",
451                    parent.display()
452                ))
453            })?;
454        }
455
456        let snapshot = CollectionSnapshot {
457            collections: self.sorted_records(),
458            id_counter: self.id_counter,
459        };
460
461        let payload = serde_json::to_string_pretty(&snapshot).map_err(|error| {
462            ApiError::internal(format!("failed serializing collections snapshot: {error}"))
463        })?;
464
465        fs::write(&self.store_path, payload).map_err(|error| {
466            ApiError::internal(format!(
467                "failed writing collection snapshot '{}': {error}",
468                self.store_path.display()
469            ))
470        })
471    }
472
473    fn sorted_records(&self) -> Vec<CollectionRecord> {
474        let mut records: Vec<_> = self.collections.values().cloned().collect();
475        records.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
476        records
477    }
478}
479
480fn parse_graph(raw: Value) -> Result<GraphData, ApiError> {
481    serde_json::from_value(raw)
482        .map_err(|error| ApiError::invalid_params(format!("invalid collection graph: {error}")))
483}
484
485fn collection_not_found_error(collection_id: &str) -> ApiError {
486    ApiError::collection_not_found(format!("Collection with id '{collection_id}' not found"))
487        .with_details(serde_json::json!({ "collectionId": collection_id }))
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use tempfile::TempDir;
494
495    /// Create a minimal collection in a fresh domain so `run()` has something
496    /// to export.
497    fn fixture_with_collection() -> (TempDir, CollectionDomain, String) {
498        let temp = TempDir::new().expect("tempdir");
499        let mut domain = CollectionDomain::new(temp.path());
500        let record = domain
501            .create(CollectionCreateParams {
502                name: "Test".to_string(),
503                description: None,
504                graph: None,
505            })
506            .expect("create collection");
507        (temp, domain, record.id)
508    }
509
510    #[test]
511    fn run_surfaces_stderr_from_failed_spawn() {
512        // A stub "ralph" that prints a distinctive error and exits non-zero.
513        // /bin/sh is universally available on macOS + Linux (Ralph's only
514        // supported platforms).
515        let (temp, domain, id) = fixture_with_collection();
516
517        // We invoke /bin/sh with `-c` so the arg list ralph.run() builds
518        // (`run -H <path> --no-tui -p <prompt>`) gets passed as extra
519        // positional args; sh echoes to stderr and exits 1 regardless.
520        let script = r#"echo "pi: command not found (simulated)" >&2; exit 1"#;
521
522        // Build a wrapper script that always writes the marker to stderr
523        // and exits 1. We'll point ralph_command at it.
524        let wrapper_path = temp.path().join("fake-ralph.sh");
525        std::fs::write(&wrapper_path, format!("#!/bin/sh\n{script}\n")).expect("write wrapper");
526        use std::os::unix::fs::PermissionsExt;
527        let mut perms = std::fs::metadata(&wrapper_path).unwrap().permissions();
528        perms.set_mode(0o755);
529        std::fs::set_permissions(&wrapper_path, perms).expect("chmod");
530
531        let result = domain.run(
532            CollectionRunParams {
533                id,
534                prompt: "hello".to_string(),
535            },
536            wrapper_path.to_str().expect("wrapper path"),
537            temp.path(),
538        );
539
540        let err = result.expect_err("fake ralph should fail");
541        let message = format!("{err:?}");
542        assert!(
543            message.contains("pi: command not found (simulated)"),
544            "error should contain the underlying stderr; got: {message}"
545        );
546        assert!(
547            message.contains("exit code 1"),
548            "error should name the exit code; got: {message}"
549        );
550        assert!(
551            !message.contains("..."),
552            "error should not truncate stderr; got: {message}"
553        );
554    }
555
556    #[test]
557    fn create_rejects_empty_name() {
558        let temp = TempDir::new().expect("tempdir");
559        let mut domain = CollectionDomain::new(temp.path());
560        let err = domain
561            .create(CollectionCreateParams {
562                name: "  ".to_string(),
563                description: None,
564                graph: None,
565            })
566            .expect_err("empty name should fail");
567        assert!(format!("{err:?}").contains("name must not be empty"));
568    }
569
570    #[test]
571    fn update_rejects_empty_name() {
572        let (_temp, mut domain, id) = fixture_with_collection();
573        let err = domain
574            .update(CollectionUpdateParams {
575                id,
576                name: Some(String::new()),
577                description: None,
578                graph: None,
579            })
580            .expect_err("empty name should fail");
581        assert!(format!("{err:?}").contains("name must not be empty"));
582    }
583
584    #[test]
585    fn run_handles_missing_ralph_binary() {
586        let (temp, domain, id) = fixture_with_collection();
587        let missing = temp.path().join("definitely-not-here");
588
589        let result = domain.run(
590            CollectionRunParams {
591                id,
592                prompt: "hello".to_string(),
593            },
594            missing.to_str().expect("missing path"),
595            temp.path(),
596        );
597
598        let err = result.expect_err("missing binary should fail");
599        let message = format!("{err:?}");
600        assert!(
601            message.contains("ralph CLI not found"),
602            "error should name the spawn failure; got: {message}"
603        );
604    }
605}