Skip to main content

ralph_api/
collection_domain.rs

1mod yaml;
2
3use std::collections::BTreeMap;
4use std::fs;
5use std::path::{Path, PathBuf};
6
7use chrono::Utc;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use tracing::warn;
11
12use crate::errors::ApiError;
13use crate::loop_support::now_ts;
14
15use self::yaml::{export_collection_yaml, graph_from_yaml};
16
17#[derive(Debug, Clone, Deserialize)]
18#[serde(rename_all = "camelCase")]
19pub struct CollectionCreateParams {
20    pub name: String,
21    pub description: Option<String>,
22    pub graph: Option<Value>,
23}
24
25#[derive(Debug, Clone, Deserialize)]
26#[serde(rename_all = "camelCase")]
27pub struct CollectionUpdateParams {
28    pub id: String,
29    pub name: Option<String>,
30    pub description: Option<String>,
31    pub graph: Option<Value>,
32}
33
34#[derive(Debug, Clone, Deserialize)]
35#[serde(rename_all = "camelCase")]
36pub struct CollectionImportParams {
37    pub yaml: String,
38    pub name: String,
39    pub description: Option<String>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct CollectionSummary {
45    pub id: String,
46    pub name: String,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub description: Option<String>,
49    pub created_at: String,
50    pub updated_at: String,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub struct CollectionRecord {
56    pub id: String,
57    pub name: String,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub description: Option<String>,
60    pub graph: GraphData,
61    pub created_at: String,
62    pub updated_at: String,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(rename_all = "camelCase")]
67pub struct GraphData {
68    pub nodes: Vec<GraphNode>,
69    pub edges: Vec<GraphEdge>,
70    pub viewport: Viewport,
71}
72
73impl Default for GraphData {
74    fn default() -> Self {
75        Self {
76            nodes: Vec::new(),
77            edges: Vec::new(),
78            viewport: Viewport {
79                x: 0.0,
80                y: 0.0,
81                zoom: 1.0,
82            },
83        }
84    }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "camelCase")]
89pub struct GraphNode {
90    pub id: String,
91    #[serde(rename = "type")]
92    pub node_type: String,
93    pub position: NodePosition,
94    pub data: HatNodeData,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(rename_all = "camelCase")]
99pub struct NodePosition {
100    pub x: f64,
101    pub y: f64,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(rename_all = "camelCase")]
106pub struct HatNodeData {
107    pub key: String,
108    pub name: String,
109    pub description: String,
110    pub triggers_on: Vec<String>,
111    pub publishes: Vec<String>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub instructions: Option<String>,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(rename_all = "camelCase")]
118pub struct GraphEdge {
119    pub id: String,
120    pub source: String,
121    pub target: String,
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub source_handle: Option<String>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub target_handle: Option<String>,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub label: Option<String>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(rename_all = "camelCase")]
132pub struct Viewport {
133    pub x: f64,
134    pub y: f64,
135    pub zoom: f64,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, Default)]
139#[serde(rename_all = "camelCase")]
140struct CollectionSnapshot {
141    collections: Vec<CollectionRecord>,
142    id_counter: u64,
143}
144
145pub struct CollectionDomain {
146    store_path: PathBuf,
147    collections: BTreeMap<String, CollectionRecord>,
148    id_counter: u64,
149}
150
151impl CollectionDomain {
152    pub fn new(workspace_root: impl AsRef<Path>) -> Self {
153        let store_path = workspace_root
154            .as_ref()
155            .join(".ralph/api/collections-v1.json");
156        let mut domain = Self {
157            store_path,
158            collections: BTreeMap::new(),
159            id_counter: 0,
160        };
161        domain.load();
162        domain
163    }
164
165    pub fn list(&self) -> Vec<CollectionSummary> {
166        let mut entries: Vec<_> = self
167            .collections
168            .values()
169            .map(|collection| CollectionSummary {
170                id: collection.id.clone(),
171                name: collection.name.clone(),
172                description: collection.description.clone(),
173                created_at: collection.created_at.clone(),
174                updated_at: collection.updated_at.clone(),
175            })
176            .collect();
177
178        entries.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
179        entries
180    }
181
182    pub fn get(&self, id: &str) -> Result<CollectionRecord, ApiError> {
183        self.collections
184            .get(id)
185            .cloned()
186            .ok_or_else(|| collection_not_found_error(id))
187    }
188
189    pub fn create(&mut self, params: CollectionCreateParams) -> Result<CollectionRecord, ApiError> {
190        let graph = params
191            .graph
192            .map(parse_graph)
193            .transpose()?
194            .unwrap_or_default();
195
196        let now = now_ts();
197        let id = self.next_collection_id();
198
199        let record = CollectionRecord {
200            id: id.clone(),
201            name: params.name,
202            description: params.description,
203            graph,
204            created_at: now.clone(),
205            updated_at: now,
206        };
207
208        self.collections.insert(id.clone(), record);
209        self.persist()?;
210        self.get(&id)
211    }
212
213    pub fn update(&mut self, params: CollectionUpdateParams) -> Result<CollectionRecord, ApiError> {
214        let record = self
215            .collections
216            .get_mut(&params.id)
217            .ok_or_else(|| collection_not_found_error(&params.id))?;
218
219        if let Some(name) = params.name {
220            record.name = name;
221        }
222
223        if let Some(description) = params.description {
224            record.description = Some(description);
225        }
226
227        if let Some(graph) = params.graph {
228            record.graph = parse_graph(graph)?;
229        }
230
231        record.updated_at = now_ts();
232        self.persist()?;
233        self.get(&params.id)
234    }
235
236    pub fn delete(&mut self, id: &str) -> Result<(), ApiError> {
237        if self.collections.remove(id).is_none() {
238            return Err(collection_not_found_error(id));
239        }
240
241        self.persist()
242    }
243
244    pub fn import(&mut self, params: CollectionImportParams) -> Result<CollectionRecord, ApiError> {
245        let graph = graph_from_yaml(&params.yaml)?;
246        self.create(CollectionCreateParams {
247            name: params.name,
248            description: params.description,
249            graph: Some(serde_json::to_value(graph).map_err(|error| {
250                ApiError::internal(format!("failed serializing graph: {error}"))
251            })?),
252        })
253    }
254
255    pub fn export(&self, id: &str) -> Result<String, ApiError> {
256        let collection = self.get(id)?;
257        export_collection_yaml(&collection)
258    }
259
260    fn next_collection_id(&mut self) -> String {
261        self.id_counter = self.id_counter.saturating_add(1);
262        format!(
263            "collection-{}-{:04x}",
264            Utc::now().timestamp_millis(),
265            self.id_counter
266        )
267    }
268
269    fn load(&mut self) {
270        if !self.store_path.exists() {
271            return;
272        }
273
274        let content = match fs::read_to_string(&self.store_path) {
275            Ok(content) => content,
276            Err(error) => {
277                warn!(
278                    path = %self.store_path.display(),
279                    %error,
280                    "failed reading collection snapshot"
281                );
282                return;
283            }
284        };
285
286        let snapshot: CollectionSnapshot = match serde_json::from_str(&content) {
287            Ok(snapshot) => snapshot,
288            Err(error) => {
289                warn!(
290                    path = %self.store_path.display(),
291                    %error,
292                    "failed parsing collection snapshot"
293                );
294                return;
295            }
296        };
297
298        self.collections = snapshot
299            .collections
300            .into_iter()
301            .map(|collection| (collection.id.clone(), collection))
302            .collect();
303        self.id_counter = snapshot.id_counter;
304    }
305
306    fn persist(&self) -> Result<(), ApiError> {
307        if let Some(parent) = self.store_path.parent() {
308            fs::create_dir_all(parent).map_err(|error| {
309                ApiError::internal(format!(
310                    "failed creating collection snapshot directory '{}': {error}",
311                    parent.display()
312                ))
313            })?;
314        }
315
316        let snapshot = CollectionSnapshot {
317            collections: self.sorted_records(),
318            id_counter: self.id_counter,
319        };
320
321        let payload = serde_json::to_string_pretty(&snapshot).map_err(|error| {
322            ApiError::internal(format!("failed serializing collections snapshot: {error}"))
323        })?;
324
325        fs::write(&self.store_path, payload).map_err(|error| {
326            ApiError::internal(format!(
327                "failed writing collection snapshot '{}': {error}",
328                self.store_path.display()
329            ))
330        })
331    }
332
333    fn sorted_records(&self) -> Vec<CollectionRecord> {
334        let mut records: Vec<_> = self.collections.values().cloned().collect();
335        records.sort_by(|a, b| a.name.cmp(&b.name).then(a.id.cmp(&b.id)));
336        records
337    }
338}
339
340fn parse_graph(raw: Value) -> Result<GraphData, ApiError> {
341    serde_json::from_value(raw)
342        .map_err(|error| ApiError::invalid_params(format!("invalid collection graph: {error}")))
343}
344
345fn collection_not_found_error(collection_id: &str) -> ApiError {
346    ApiError::collection_not_found(format!("Collection with id '{collection_id}' not found"))
347        .with_details(serde_json::json!({ "collectionId": collection_id }))
348}