Skip to main content

pitchfork_cli/
state_file.rs

1use crate::daemon::Daemon;
2use crate::daemon_id::DaemonId;
3use crate::error::FileError;
4use crate::{Result, env};
5use once_cell::sync::Lazy;
6use std::collections::{BTreeMap, BTreeSet};
7use std::fmt::Debug;
8use std::path::{Path, PathBuf};
9use std::sync::Mutex;
10use std::sync::atomic::{AtomicBool, Ordering};
11
12#[derive(Debug, serde::Serialize, serde::Deserialize)]
13pub struct StateFile {
14    #[serde(default)]
15    pub daemons: BTreeMap<DaemonId, Daemon>,
16    #[serde(default)]
17    pub disabled: BTreeSet<DaemonId>,
18    #[serde(default)]
19    pub shell_dirs: BTreeMap<String, PathBuf>,
20    #[serde(skip)]
21    pub(crate) path: PathBuf,
22    #[serde(skip)]
23    pub(crate) dirty: AtomicBool,
24    /// Snapshot of the last written TOML content. Used by `write()` to skip
25    /// redundant disk I/O when the serialized state hasn't changed.
26    /// Guarded by the file lock in practice; `Mutex` is used only to satisfy
27    /// `Sync` since `write` takes `&self`.
28    #[serde(skip)]
29    pub(crate) last_content: Mutex<Option<String>>,
30}
31
32impl StateFile {
33    pub fn new(path: PathBuf) -> Self {
34        Self {
35            daemons: Default::default(),
36            disabled: Default::default(),
37            shell_dirs: Default::default(),
38            path,
39            dirty: AtomicBool::new(false),
40            last_content: Mutex::new(None),
41        }
42    }
43
44    pub fn get() -> &'static Self {
45        static STATE_FILE: Lazy<StateFile> = Lazy::new(|| {
46            let path = &*env::PITCHFORK_STATE_FILE;
47            StateFile::read(path).unwrap_or_else(|e| {
48                error!(
49                    "failed to read state file {}: {}. Falling back to in-memory empty state",
50                    path.display(),
51                    e
52                );
53                StateFile::new(path.to_path_buf())
54            })
55        });
56        &STATE_FILE
57    }
58
59    pub fn read<P: AsRef<Path>>(path: P) -> Result<Self> {
60        let path = path.as_ref();
61        if !path.exists() {
62            return Ok(Self::new(path.to_path_buf()));
63        }
64        let canonical_path = normalized_lock_path(path);
65        let _lock = xx::fslock::get(&canonical_path, false)?;
66        let raw = xx::file::read_to_string(path).unwrap_or_else(|e| {
67            warn!("Error reading state file {path:?}: {e}");
68            String::new()
69        });
70
71        // Try to parse directly (new format with qualified IDs)
72        match toml::from_str::<Self>(&raw) {
73            Ok(mut state_file) => {
74                state_file.path = path.to_path_buf();
75                state_file.dirty = AtomicBool::new(false);
76                for (id, daemon) in state_file.daemons.iter_mut() {
77                    daemon.id = id.clone();
78                }
79                // Seed last_content with the raw TOML so the first write() can
80                // skip disk I/O when the state hasn't actually changed.
81                state_file.last_content = Mutex::new(Some(raw));
82                Ok(state_file)
83            }
84            Err(parse_err) => {
85                if Self::looks_like_old_format(&raw) {
86                    // Silent migration: attempt to rewrite bare keys as legacy/<name>
87                    debug!(
88                        "State file at {} appears to be in old format, attempting silent migration",
89                        path.display()
90                    );
91                    match Self::migrate_old_format(&raw) {
92                        Ok(migrated) => {
93                            let mut state_file = migrated;
94                            state_file.path = path.to_path_buf();
95                            // Persist migrated state while we still hold the lock
96                            if let Err(e) = state_file.write_unlocked() {
97                                warn!("State file migration write failed: {e}");
98                            }
99                            debug!("State file migrated successfully");
100                            return Ok(state_file);
101                        }
102                        Err(e) => {
103                            error!(
104                                "State file migration failed: {e}. \
105                                 Raw content preserved at {}. Starting with empty state.",
106                                path.display()
107                            );
108                            return Err(miette::miette!(
109                                "Failed to migrate state file {}: {e}",
110                                path.display()
111                            ));
112                        }
113                    }
114                }
115                // New-format parse failure: do NOT silently discard state.
116                Err(miette::miette!(
117                    "Failed to parse state file {}: {parse_err}",
118                    path.display()
119                ))
120            }
121        }
122    }
123
124    /// Returns true if the TOML looks like the old state file format, i.e. the
125    /// `daemons` table has at least one key that is missing the `namespace/`
126    /// prefix.  Detection is done by parsing as a generic `toml::Value` so it
127    /// works regardless of how the table headers are written.
128    fn looks_like_old_format(raw: &str) -> bool {
129        use toml::Value;
130        let Ok(Value::Table(doc)) = toml::from_str::<Value>(raw) else {
131            return false;
132        };
133        let Some(Value::Table(daemons)) = doc.get("daemons") else {
134            return false;
135        };
136        // Old format: at least one daemon key has no '/'
137        !daemons.is_empty() && daemons.keys().any(|k| !k.contains('/'))
138    }
139
140    /// Parse old-format state TOML (bare daemon names) and return a new-format
141    /// `StateFile` with daemon IDs qualified under the `"legacy"` namespace.
142    fn migrate_old_format(raw: &str) -> Result<Self> {
143        use toml::Value;
144
145        const LEGACY_NAMESPACE: &str = "legacy";
146
147        // Parse as generic TOML value
148        let mut doc: toml::map::Map<String, Value> = toml::from_str(raw)
149            .map_err(|e| miette::miette!("failed to parse old state file: {e}"))?;
150
151        // Re-key [daemons] entries: "name" -> "legacy/name"
152        if let Some(Value::Table(daemons)) = doc.get_mut("daemons") {
153            let old_keys: Vec<String> = daemons.keys().cloned().collect();
154            for key in old_keys {
155                if !key.contains('/')
156                    && let Some(val) = daemons.remove(&key)
157                {
158                    let mut new_key = format!("{LEGACY_NAMESPACE}/{key}");
159                    // Preserve data on collision by assigning a unique migrated key.
160                    if daemons.contains_key(&new_key) {
161                        let base = format!("{key}-legacy");
162                        let mut candidate = format!("{LEGACY_NAMESPACE}/{base}");
163                        let mut n: u32 = 2;
164                        while daemons.contains_key(&candidate) {
165                            candidate = format!("{LEGACY_NAMESPACE}/{base}-{n}");
166                            n += 1;
167                        }
168                        warn!(
169                            "Legacy daemon key '{}' collides with '{}'; migrating as '{}'",
170                            key,
171                            format_args!("{LEGACY_NAMESPACE}/{key}"),
172                            candidate
173                        );
174                        new_key = candidate;
175                    }
176                    // Update the inner `id` field too
177                    let val = if let Value::Table(mut tbl) = val {
178                        tbl.insert("id".to_string(), Value::String(new_key.clone()));
179                        Value::Table(tbl)
180                    } else {
181                        val
182                    };
183                    daemons.insert(new_key, val);
184                }
185            }
186        }
187
188        // Re-key [disabled] set entries the same way
189        if let Some(Value::Array(disabled)) = doc.get_mut("disabled") {
190            for entry in disabled.iter_mut() {
191                if let Value::String(s) = entry
192                    && !s.contains('/')
193                {
194                    *s = format!("{LEGACY_NAMESPACE}/{s}");
195                }
196            }
197        }
198
199        let new_raw =
200            toml::to_string(&Value::Table(doc)).map_err(|e| FileError::SerializeError {
201                path: PathBuf::new(),
202                source: e,
203            })?;
204
205        let mut state_file: Self = toml::from_str(&new_raw)
206            .map_err(|e| miette::miette!("failed to parse migrated state file: {e}"))?;
207        // Sync inner daemon id fields
208        for (id, daemon) in state_file.daemons.iter_mut() {
209            daemon.id = id.clone();
210        }
211        Ok(state_file)
212    }
213
214    /// Mark the state file as dirty so the background flush task will
215    /// persist it on the next tick.
216    fn mark_dirty(&self) {
217        self.dirty.store(true, Ordering::Relaxed);
218    }
219
220    /// Check whether the state file needs to be flushed.
221    pub fn is_dirty(&self) -> bool {
222        self.dirty.load(Ordering::Relaxed)
223    }
224
225    /// Insert or replace a daemon entry and mark the state dirty.
226    pub fn insert_daemon(&mut self, id: &DaemonId, daemon: Daemon) {
227        self.daemons.insert(id.clone(), daemon);
228        self.mark_dirty();
229    }
230
231    /// Remove a daemon entry and mark the state dirty if the daemon existed.
232    pub fn remove_daemon(&mut self, id: &DaemonId) {
233        if self.daemons.remove(id).is_some() {
234            self.mark_dirty();
235        }
236    }
237
238    /// Disable a daemon (add to disabled set) and mark the state dirty.
239    /// Returns true if the daemon was not already disabled.
240    pub fn disable_daemon(&mut self, id: &DaemonId) -> bool {
241        let inserted = self.disabled.insert(id.clone());
242        if inserted {
243            self.mark_dirty();
244        }
245        inserted
246    }
247
248    /// Enable a daemon (remove from disabled set) and mark the state dirty.
249    /// Returns true if the daemon was previously disabled.
250    pub fn enable_daemon(&mut self, id: &DaemonId) -> bool {
251        let removed = self.disabled.remove(id);
252        if removed {
253            self.mark_dirty();
254        }
255        removed
256    }
257
258    /// Set the active port for a daemon and mark the state dirty.
259    /// Returns true if the daemon was found and updated.
260    pub fn set_active_port(&mut self, id: &DaemonId, port: u16) -> bool {
261        if let Some(d) = self.daemons.get_mut(id) {
262            d.active_port = Some(port);
263            self.mark_dirty();
264            true
265        } else {
266            false
267        }
268    }
269
270    /// Clear the active port for a daemon and mark the state dirty.
271    /// Returns true if the daemon was found and updated.
272    pub fn clear_active_port(&mut self, id: &DaemonId) -> bool {
273        if let Some(d) = self.daemons.get_mut(id) {
274            d.active_port = None;
275            self.mark_dirty();
276            true
277        } else {
278            false
279        }
280    }
281
282    /// Update the last cron trigger time for a daemon and mark the state dirty.
283    /// Returns true if the daemon was found and updated.
284    pub fn set_last_cron_triggered(
285        &mut self,
286        id: &DaemonId,
287        time: chrono::DateTime<chrono::Local>,
288    ) -> bool {
289        if let Some(d) = self.daemons.get_mut(id) {
290            d.last_cron_triggered = Some(time);
291            self.mark_dirty();
292            true
293        } else {
294            false
295        }
296    }
297
298    /// Set a shell working directory and mark the state dirty.
299    pub fn set_shell_dir(&mut self, shell_pid: u32, dir: PathBuf) {
300        self.shell_dirs.insert(shell_pid.to_string(), dir);
301        self.mark_dirty();
302    }
303
304    /// Remove a shell working directory and mark the state dirty.
305    /// Returns true if the entry existed.
306    pub fn remove_shell_dir(&mut self, shell_pid: u32) -> bool {
307        let removed = self.shell_dirs.remove(&shell_pid.to_string()).is_some();
308        if removed {
309            self.mark_dirty();
310        }
311        removed
312    }
313
314    /// Retain only daemons matching the predicate and mark the state dirty
315    /// if any were removed.
316    pub fn retain_daemons<F>(&mut self, mut f: F)
317    where
318        F: FnMut(&DaemonId, &Daemon) -> bool,
319    {
320        let before = self.daemons.len();
321        self.daemons.retain(|id, daemon| f(id, daemon));
322        if self.daemons.len() != before {
323            self.mark_dirty();
324        }
325    }
326
327    /// Synchronous force-write. Clears the dirty flag. If the serialized
328    /// content matches the last written content, the disk write is skipped to
329    /// avoid unnecessary I/O. Used during shutdown and migration where async
330    /// flushing is not available.
331    pub fn write(&self) -> Result<()> {
332        let canonical_path = normalized_lock_path(&self.path);
333        let _lock = xx::fslock::get(&canonical_path, false)?;
334        let raw = toml::to_string(self).map_err(|e| FileError::SerializeError {
335            path: self.path.clone(),
336            source: e,
337        })?;
338        if self
339            .last_content
340            .lock()
341            .unwrap()
342            .as_ref()
343            .is_some_and(|last| last == &raw)
344        {
345            // No real change — just clear the dirty flag
346            self.dirty.store(false, Ordering::Relaxed);
347            return Ok(());
348        }
349        Self::write_raw(&self.path, &raw)?;
350        *self.last_content.lock().unwrap() = Some(raw);
351        self.dirty.store(false, Ordering::Relaxed);
352        Ok(())
353    }
354
355    /// Write the state file without acquiring the lock.
356    /// Used internally when the lock is already held (e.g., during migration in read()).
357    fn write_unlocked(&self) -> Result<()> {
358        let raw = toml::to_string(self).map_err(|e| FileError::SerializeError {
359            path: self.path.clone(),
360            source: e,
361        })?;
362        Self::write_raw(&self.path, &raw)?;
363        *self.last_content.lock().unwrap() = Some(raw);
364        self.dirty.store(false, Ordering::Relaxed);
365        Ok(())
366    }
367
368    /// Perform the actual file I/O (temp file + atomic rename).
369    /// **The caller MUST hold the file lock** (via `xx::fslock::get`) before
370    /// calling this function; otherwise concurrent writes may corrupt the file.
371    pub(crate) fn write_raw(path: &Path, raw: &str) -> Result<()> {
372        if let Some(parent) = path.parent() {
373            std::fs::create_dir_all(parent).map_err(|e| FileError::WriteError {
374                path: parent.to_path_buf(),
375                details: Some(format!("failed to create state file directory: {e}")),
376            })?;
377        }
378        let temp_path = path.with_extension("toml.tmp");
379        xx::file::write(&temp_path, raw).map_err(|e| FileError::WriteError {
380            path: temp_path.clone(),
381            details: Some(e.to_string()),
382        })?;
383        std::fs::rename(&temp_path, path).map_err(|e| FileError::WriteError {
384            path: path.to_path_buf(),
385            details: Some(format!("failed to rename temp file: {e}")),
386        })?;
387        Ok(())
388    }
389}
390
391fn normalized_lock_path(path: &Path) -> PathBuf {
392    if let Ok(canonical) = path.canonicalize() {
393        return canonical;
394    }
395
396    if let Some(parent) = path.parent()
397        && let Ok(canonical_parent) = parent.canonicalize()
398        && let Some(file_name) = path.file_name()
399    {
400        return canonical_parent.join(file_name);
401    }
402
403    path.to_path_buf()
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use crate::daemon_status::DaemonStatus;
410
411    #[test]
412    fn test_state_file_toml_roundtrip_stopped() {
413        let mut state = StateFile::new(PathBuf::from("/tmp/test.toml"));
414        let daemon_id = DaemonId::new("project", "test");
415        state.daemons.insert(
416            daemon_id.clone(),
417            Daemon {
418                id: daemon_id,
419                status: DaemonStatus::Stopped,
420                last_exit_success: Some(true),
421                user: Some("postgres".to_string()),
422                ..Daemon::default()
423            },
424        );
425
426        let toml_str = toml::to_string(&state).unwrap();
427        println!("Serialized TOML:\n{toml_str}");
428
429        let parsed: StateFile = toml::from_str(&toml_str).expect("Failed to parse TOML");
430        println!("Parsed: {parsed:?}");
431
432        assert!(
433            parsed
434                .daemons
435                .contains_key(&DaemonId::new("project", "test"))
436        );
437        let daemon = parsed
438            .daemons
439            .get(&DaemonId::new("project", "test"))
440            .unwrap();
441        assert_eq!(daemon.user.as_deref(), Some("postgres"));
442    }
443
444    #[test]
445    fn test_looks_like_old_format_bare_names() {
446        let old = r#"
447[daemons.api]
448id = "api"
449autostop = false
450retry = 0
451retry_count = 0
452status = "stopped"
453"#;
454        assert!(StateFile::looks_like_old_format(old));
455    }
456
457    #[test]
458    fn test_looks_like_old_format_new_format() {
459        let new = r#"
460    disabled = []
461
462    [daemons."legacy/api"]
463    id = "legacy/api"
464autostop = false
465retry = 0
466retry_count = 0
467status = "stopped"
468"#;
469        assert!(!StateFile::looks_like_old_format(new));
470    }
471
472    #[test]
473    fn test_looks_like_old_format_empty() {
474        assert!(!StateFile::looks_like_old_format(""));
475        assert!(!StateFile::looks_like_old_format("[shell_dirs]"));
476    }
477
478    #[test]
479    fn test_migrate_old_format_basic() {
480        let old = r#"
481[daemons.api]
482id = "api"
483autostop = false
484retry = 0
485retry_count = 0
486status = "stopped"
487
488[daemons.worker]
489id = "worker"
490autostop = false
491retry = 0
492retry_count = 0
493status = "stopped"
494last_exit_success = true
495"#;
496        let migrated = StateFile::migrate_old_format(old).expect("migration should succeed");
497        assert!(
498            migrated
499                .daemons
500                .contains_key(&DaemonId::new("legacy", "api")),
501            "api should be migrated to legacy/api"
502        );
503        assert!(
504            migrated
505                .daemons
506                .contains_key(&DaemonId::new("legacy", "worker")),
507            "worker should be migrated to legacy/worker"
508        );
509        assert_eq!(migrated.daemons.len(), 2);
510    }
511
512    #[test]
513    fn test_migrate_old_format_preserves_disabled() {
514        let old = r#"
515disabled = ["api", "worker"]
516
517[daemons.api]
518id = "api"
519autostop = false
520retry = 0
521retry_count = 0
522status = "stopped"
523"#;
524        let migrated = StateFile::migrate_old_format(old).expect("migration should succeed");
525        assert!(
526            migrated.disabled.contains(&DaemonId::new("legacy", "api")),
527            "disabled 'api' should become 'legacy/api'"
528        );
529        assert!(
530            migrated
531                .disabled
532                .contains(&DaemonId::new("legacy", "worker")),
533            "disabled 'worker' should become 'legacy/worker'"
534        );
535    }
536
537    #[test]
538    fn test_migrate_old_format_already_qualified_unchanged() {
539        // If somehow a key already has a namespace, it should not be double-prefixed
540        let mixed = r#"
541[daemons.bare]
542id = "bare"
543autostop = false
544retry = 0
545retry_count = 0
546status = "stopped"
547"#;
548        let migrated = StateFile::migrate_old_format(mixed).expect("migration should succeed");
549        // "bare" -> "legacy/bare", not "legacy/legacy/bare"
550        assert!(
551            migrated
552                .daemons
553                .contains_key(&DaemonId::new("legacy", "bare")),
554            "bare key should become legacy/bare"
555        );
556        // Should not have double-prefixed entry
557        assert_eq!(migrated.daemons.len(), 1);
558    }
559
560    #[test]
561    fn test_migrate_old_format_does_not_overwrite_existing_qualified_entry() {
562        let mixed = r#"
563[daemons.api]
564id = "api"
565cmd = ["echo", "old"]
566autostop = false
567retry = 0
568retry_count = 0
569status = "stopped"
570
571[daemons."legacy/api"]
572id = "legacy/api"
573cmd = ["echo", "new"]
574autostop = false
575retry = 0
576retry_count = 0
577status = "stopped"
578"#;
579
580        let migrated = StateFile::migrate_old_format(mixed).expect("migration should succeed");
581        let key = DaemonId::new("legacy", "api");
582        let daemon = migrated.daemons.get(&key).expect("legacy/api should exist");
583
584        let cmd = daemon.cmd.as_ref().expect("cmd should exist");
585        assert_eq!(cmd, &vec!["echo".to_string(), "new".to_string()]);
586
587        // Colliding bare key should be preserved under a unique migrated key.
588        let preserved = DaemonId::new("legacy", "api-legacy");
589        let preserved_daemon = migrated
590            .daemons
591            .get(&preserved)
592            .expect("colliding bare key should be preserved as legacy/api-legacy");
593        let preserved_cmd = preserved_daemon
594            .cmd
595            .as_ref()
596            .expect("preserved cmd should exist");
597        assert_eq!(preserved_cmd, &vec!["echo".to_string(), "old".to_string()]);
598        assert_eq!(migrated.daemons.len(), 2);
599    }
600}