1mod yaml;
2
3use std::collections::BTreeMap;
4use std::fs;
5use std::path::{Path, PathBuf};
6
7use chrono::Utc;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use tracing::warn;
11
12use crate::errors::ApiError;
13use crate::loop_support::now_ts;
14
15use self::yaml::{export_collection_yaml, graph_from_yaml};
16
17#[derive(Debug, Clone, Deserialize)]
18#[serde(rename_all = "camelCase")]
19pub struct CollectionCreateParams {
20 pub name: String,
21 pub description: Option<String>,
22 pub graph: Option<Value>,
23}
24
25#[derive(Debug, Clone, Deserialize)]
26#[serde(rename_all = "camelCase")]
27pub struct CollectionUpdateParams {
28 pub id: String,
29 pub name: Option<String>,
30 pub description: Option<String>,
31 pub graph: Option<Value>,
32}
33
34#[derive(Debug, Clone, Deserialize)]
35#[serde(rename_all = "camelCase")]
36pub struct CollectionImportParams {
37 pub yaml: String,
38 pub name: String,
39 pub description: Option<String>,
40}
41
42#[derive(Debug, Clone, Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct CollectionRunParams {
45 pub id: String,
46 pub prompt: String,
47}
48
49#[derive(Debug, Clone, Serialize)]
50#[serde(rename_all = "camelCase")]
51pub struct CollectionRunResult {
52 pub success: bool,
53 pub config_path: String,
54 pub pid: u32,
55 #[serde(skip_serializing_if = "Option::is_none")]
59 pub starting_hat: Option<String>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(rename_all = "camelCase")]
64pub struct CollectionSummary {
65 pub id: String,
66 pub name: String,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub description: Option<String>,
69 pub created_at: String,
70 pub updated_at: String,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74#[serde(rename_all = "camelCase")]
75pub struct CollectionRecord {
76 pub id: String,
77 pub name: String,
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub description: Option<String>,
80 pub graph: GraphData,
81 pub created_at: String,
82 pub updated_at: String,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86#[serde(rename_all = "camelCase")]
87pub struct GraphData {
88 pub nodes: Vec<GraphNode>,
89 pub edges: Vec<GraphEdge>,
90 pub viewport: Viewport,
91}
92
93impl Default for GraphData {
94 fn default() -> Self {
95 Self {
96 nodes: Vec::new(),
97 edges: Vec::new(),
98 viewport: Viewport {
99 x: 0.0,
100 y: 0.0,
101 zoom: 1.0,
102 },
103 }
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108#[serde(rename_all = "camelCase")]
109pub struct GraphNode {
110 pub id: String,
111 #[serde(rename = "type")]
112 pub node_type: String,
113 pub position: NodePosition,
114 pub data: HatNodeData,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "camelCase")]
119pub struct NodePosition {
120 pub x: f64,
121 pub y: f64,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "camelCase")]
126pub struct HatNodeData {
127 pub key: String,
128 pub name: String,
129 pub description: String,
130 pub triggers_on: Vec<String>,
131 pub publishes: Vec<String>,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 pub instructions: Option<String>,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137#[serde(rename_all = "camelCase")]
138pub struct GraphEdge {
139 pub id: String,
140 pub source: String,
141 pub target: String,
142 #[serde(skip_serializing_if = "Option::is_none")]
143 pub source_handle: Option<String>,
144 #[serde(skip_serializing_if = "Option::is_none")]
145 pub target_handle: Option<String>,
146 #[serde(skip_serializing_if = "Option::is_none")]
147 pub label: Option<String>,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151#[serde(rename_all = "camelCase")]
152pub struct Viewport {
153 pub x: f64,
154 pub y: f64,
155 pub zoom: f64,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, Default)]
159#[serde(rename_all = "camelCase")]
160struct CollectionSnapshot {
161 collections: Vec<CollectionRecord>,
162 id_counter: u64,
163}
164
165pub struct CollectionDomain {
166 store_path: PathBuf,
167 collections: BTreeMap<String, CollectionRecord>,
168 id_counter: u64,
169}
170
171impl CollectionDomain {
172 pub fn new(workspace_root: impl AsRef<Path>) -> Self {
173 let store_path = workspace_root
174 .as_ref()
175 .join(".ralph/api/collections-v1.json");
176 let mut domain = Self {
177 store_path,
178 collections: BTreeMap::new(),
179 id_counter: 0,
180 };
181 domain.load();
182 domain
183 }
184
185 pub fn list(&self) -> Vec<CollectionSummary> {
186 let mut entries: Vec<_> = self
187 .collections
188 .values()
189 .map(|collection| CollectionSummary {
190 id: collection.id.clone(),
191 name: collection.name.clone(),
192 description: collection.description.clone(),
193 created_at: collection.created_at.clone(),
194 updated_at: collection.updated_at.clone(),
195 })
196 .collect();
197
198 entries.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
199 entries
200 }
201
202 pub fn get(&self, id: &str) -> Result<CollectionRecord, ApiError> {
203 self.collections
204 .get(id)
205 .cloned()
206 .ok_or_else(|| collection_not_found_error(id))
207 }
208
209 pub fn create(&mut self, params: CollectionCreateParams) -> Result<CollectionRecord, ApiError> {
210 if params.name.trim().is_empty() {
211 return Err(ApiError::invalid_params(
212 "collection name must not be empty",
213 ));
214 }
215
216 let graph = params
217 .graph
218 .map(parse_graph)
219 .transpose()?
220 .unwrap_or_default();
221
222 let now = now_ts();
223 let id = self.next_collection_id();
224
225 let record = CollectionRecord {
226 id: id.clone(),
227 name: params.name,
228 description: params.description,
229 graph,
230 created_at: now.clone(),
231 updated_at: now,
232 };
233
234 self.collections.insert(id.clone(), record);
235 self.persist()?;
236 self.get(&id)
237 }
238
239 pub fn update(&mut self, params: CollectionUpdateParams) -> Result<CollectionRecord, ApiError> {
240 let record = self
241 .collections
242 .get_mut(¶ms.id)
243 .ok_or_else(|| collection_not_found_error(¶ms.id))?;
244
245 if let Some(ref name) = params.name {
246 if name.trim().is_empty() {
247 return Err(ApiError::invalid_params(
248 "collection name must not be empty",
249 ));
250 }
251 record.name = name.clone();
252 }
253
254 if let Some(description) = params.description {
255 record.description = Some(description);
256 }
257
258 if let Some(graph) = params.graph {
259 record.graph = parse_graph(graph)?;
260 }
261
262 record.updated_at = now_ts();
263 self.persist()?;
264 self.get(¶ms.id)
265 }
266
267 pub fn delete(&mut self, id: &str) -> Result<(), ApiError> {
268 if self.collections.remove(id).is_none() {
269 return Err(collection_not_found_error(id));
270 }
271
272 self.persist()
273 }
274
275 pub fn import(&mut self, params: CollectionImportParams) -> Result<CollectionRecord, ApiError> {
276 let graph = graph_from_yaml(¶ms.yaml)?;
277 self.create(CollectionCreateParams {
278 name: params.name,
279 description: params.description,
280 graph: Some(serde_json::to_value(graph).map_err(|error| {
281 ApiError::internal(format!("failed serializing graph: {error}"))
282 })?),
283 })
284 }
285
286 pub fn export(&self, id: &str) -> Result<String, ApiError> {
287 let collection = self.get(id)?;
288 export_collection_yaml(&collection)
289 }
290
291 pub fn run(
298 &self,
299 params: CollectionRunParams,
300 ralph_command: &str,
301 workspace_root: &Path,
302 ) -> Result<CollectionRunResult, ApiError> {
303 let yaml = self.export(¶ms.id)?;
304
305 let collections_dir = workspace_root.join(".ralph/collections");
307 fs::create_dir_all(&collections_dir).map_err(|error| {
308 ApiError::internal(format!(
309 "failed creating collections run directory: {error}"
310 ))
311 })?;
312
313 let config_path = collections_dir.join(format!("{}-run.yml", params.id));
314 fs::write(&config_path, &yaml).map_err(|error| {
315 ApiError::internal(format!(
316 "failed writing collection run config '{}': {error}",
317 config_path.display()
318 ))
319 })?;
320
321 let child = std::process::Command::new(ralph_command)
328 .current_dir(workspace_root)
329 .args([
330 "run",
331 "-H",
332 &config_path.to_string_lossy(),
333 "-a",
334 "-p",
335 ¶ms.prompt,
336 ])
337 .stdout(std::process::Stdio::null())
338 .stderr(std::process::Stdio::piped())
339 .spawn()
340 .map_err(|error| {
341 ApiError::internal(format!(
342 "ralph CLI not found or failed to start. Install ralph or set RALPH_API_RALPH_COMMAND. Error: {error}"
343 ))
344 })?;
345
346 let pid = child.id();
347
348 let mut child = child;
350 std::thread::sleep(std::time::Duration::from_millis(500));
351 match child.try_wait() {
352 Ok(Some(status)) if !status.success() => {
353 let mut stderr_output = String::new();
354 if let Some(mut stderr) = child.stderr.take() {
355 use std::io::Read;
356 let _ = stderr.read_to_string(&mut stderr_output);
357 }
358 let trimmed = stderr_output.trim();
359 let status_label = match status.code() {
364 Some(code) => format!("exit code {code}"),
365 None => format!("{status}"),
366 };
367 let message = if trimmed.is_empty() {
368 format!("ralph run exited with {status_label} (no stderr)")
369 } else {
370 format!("ralph run exited with {status_label}:\n{trimmed}")
374 };
375 return Err(ApiError::internal(message));
376 }
377 _ => {
378 std::thread::spawn(move || {
382 let _ = child.wait();
383 });
384 }
385 }
386
387 let collection = self.get(¶ms.id)?;
390 let starting_hat = yaml::starting_hat_for_collection(&collection);
391
392 Ok(CollectionRunResult {
393 success: true,
394 config_path: config_path.to_string_lossy().to_string(),
395 pid,
396 starting_hat,
397 })
398 }
399
400 fn next_collection_id(&mut self) -> String {
401 self.id_counter = self.id_counter.saturating_add(1);
402 format!(
403 "collection-{}-{:04x}",
404 Utc::now().timestamp_millis(),
405 self.id_counter
406 )
407 }
408
409 fn load(&mut self) {
410 if !self.store_path.exists() {
411 return;
412 }
413
414 let content = match fs::read_to_string(&self.store_path) {
415 Ok(content) => content,
416 Err(error) => {
417 warn!(
418 path = %self.store_path.display(),
419 %error,
420 "failed reading collection snapshot"
421 );
422 return;
423 }
424 };
425
426 let snapshot: CollectionSnapshot = match serde_json::from_str(&content) {
427 Ok(snapshot) => snapshot,
428 Err(error) => {
429 warn!(
430 path = %self.store_path.display(),
431 %error,
432 "failed parsing collection snapshot"
433 );
434 return;
435 }
436 };
437
438 self.collections = snapshot
439 .collections
440 .into_iter()
441 .map(|collection| (collection.id.clone(), collection))
442 .collect();
443 self.id_counter = snapshot.id_counter;
444 }
445
446 fn persist(&self) -> Result<(), ApiError> {
447 if let Some(parent) = self.store_path.parent() {
448 fs::create_dir_all(parent).map_err(|error| {
449 ApiError::internal(format!(
450 "failed creating collection snapshot directory '{}': {error}",
451 parent.display()
452 ))
453 })?;
454 }
455
456 let snapshot = CollectionSnapshot {
457 collections: self.sorted_records(),
458 id_counter: self.id_counter,
459 };
460
461 let payload = serde_json::to_string_pretty(&snapshot).map_err(|error| {
462 ApiError::internal(format!("failed serializing collections snapshot: {error}"))
463 })?;
464
465 fs::write(&self.store_path, payload).map_err(|error| {
466 ApiError::internal(format!(
467 "failed writing collection snapshot '{}': {error}",
468 self.store_path.display()
469 ))
470 })
471 }
472
473 fn sorted_records(&self) -> Vec<CollectionRecord> {
474 let mut records: Vec<_> = self.collections.values().cloned().collect();
475 records.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
476 records
477 }
478}
479
480fn parse_graph(raw: Value) -> Result<GraphData, ApiError> {
481 serde_json::from_value(raw)
482 .map_err(|error| ApiError::invalid_params(format!("invalid collection graph: {error}")))
483}
484
485fn collection_not_found_error(collection_id: &str) -> ApiError {
486 ApiError::collection_not_found(format!("Collection with id '{collection_id}' not found"))
487 .with_details(serde_json::json!({ "collectionId": collection_id }))
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use tempfile::TempDir;
494
495 fn fixture_with_collection() -> (TempDir, CollectionDomain, String) {
498 let temp = TempDir::new().expect("tempdir");
499 let mut domain = CollectionDomain::new(temp.path());
500 let record = domain
501 .create(CollectionCreateParams {
502 name: "Test".to_string(),
503 description: None,
504 graph: None,
505 })
506 .expect("create collection");
507 (temp, domain, record.id)
508 }
509
510 #[test]
511 fn run_surfaces_stderr_from_failed_spawn() {
512 let (temp, domain, id) = fixture_with_collection();
516
517 let script = r#"echo "pi: command not found (simulated)" >&2; exit 1"#;
521
522 let wrapper_path = temp.path().join("fake-ralph.sh");
525 std::fs::write(&wrapper_path, format!("#!/bin/sh\n{script}\n")).expect("write wrapper");
526 use std::os::unix::fs::PermissionsExt;
527 let mut perms = std::fs::metadata(&wrapper_path).unwrap().permissions();
528 perms.set_mode(0o755);
529 std::fs::set_permissions(&wrapper_path, perms).expect("chmod");
530
531 let result = domain.run(
532 CollectionRunParams {
533 id,
534 prompt: "hello".to_string(),
535 },
536 wrapper_path.to_str().expect("wrapper path"),
537 temp.path(),
538 );
539
540 let err = result.expect_err("fake ralph should fail");
541 let message = format!("{err:?}");
542 assert!(
543 message.contains("pi: command not found (simulated)"),
544 "error should contain the underlying stderr; got: {message}"
545 );
546 assert!(
547 message.contains("exit code 1"),
548 "error should name the exit code; got: {message}"
549 );
550 assert!(
551 !message.contains("..."),
552 "error should not truncate stderr; got: {message}"
553 );
554 }
555
556 #[test]
557 fn create_rejects_empty_name() {
558 let temp = TempDir::new().expect("tempdir");
559 let mut domain = CollectionDomain::new(temp.path());
560 let err = domain
561 .create(CollectionCreateParams {
562 name: " ".to_string(),
563 description: None,
564 graph: None,
565 })
566 .expect_err("empty name should fail");
567 assert!(format!("{err:?}").contains("name must not be empty"));
568 }
569
570 #[test]
571 fn update_rejects_empty_name() {
572 let (_temp, mut domain, id) = fixture_with_collection();
573 let err = domain
574 .update(CollectionUpdateParams {
575 id,
576 name: Some(String::new()),
577 description: None,
578 graph: None,
579 })
580 .expect_err("empty name should fail");
581 assert!(format!("{err:?}").contains("name must not be empty"));
582 }
583
584 #[test]
585 fn run_handles_missing_ralph_binary() {
586 let (temp, domain, id) = fixture_with_collection();
587 let missing = temp.path().join("definitely-not-here");
588
589 let result = domain.run(
590 CollectionRunParams {
591 id,
592 prompt: "hello".to_string(),
593 },
594 missing.to_str().expect("missing path"),
595 temp.path(),
596 );
597
598 let err = result.expect_err("missing binary should fail");
599 let message = format!("{err:?}");
600 assert!(
601 message.contains("ralph CLI not found"),
602 "error should name the spawn failure; got: {message}"
603 );
604 }
605}