Skip to main content

ff_script/
loader.rs

1//! Library loader: ensures the `flowfabric` Lua function library is loaded
2//! on the Valkey server at the expected version.
3//!
4//! Pattern: version-check → load-if-needed → verify (matches glide-mq).
5
6use ferriskey::Client;
7
8use crate::{LIBRARY_SOURCE, LIBRARY_VERSION};
9
10/// Errors from the library loader.
11#[derive(Debug, thiserror::Error)]
12pub enum LoadError {
13    /// ferriskey returned an error unrelated to "function not found".
14    /// Preserves `ferriskey::ErrorKind` so callers can distinguish
15    /// timeout / auth / NOSCRIPT / cluster-redirect on library-load failures.
16    #[error("valkey: {0}")]
17    Valkey(#[from] ferriskey::Error),
18    /// Version mismatch after load — another process may have loaded a
19    /// different version concurrently.
20    #[error("version mismatch after load: expected {expected}, got {got}")]
21    VersionMismatch { expected: String, got: String },
22}
23
24impl LoadError {
25    /// Returns the underlying ferriskey ErrorKind, if this is a Valkey error.
26    pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
27        match self {
28            Self::Valkey(e) => Some(e.kind()),
29            _ => None,
30        }
31    }
32}
33
34/// Ensure the `flowfabric` function library is loaded on the server and at
35/// the expected version.
36///
37/// 1. `FCALL ff_version 0` — check if library is loaded and version matches
38/// 2. If missing or version mismatch → `FUNCTION LOAD REPLACE` (with retry)
39/// 3. Verify by calling `ff_version` again
40pub async fn ensure_library(client: &Client) -> Result<(), LoadError> {
41    match check_version(client).await {
42        Ok(true) => {
43            tracing::debug!("flowfabric library already loaded at version {LIBRARY_VERSION}");
44            return Ok(());
45        }
46        Ok(false) => {
47            tracing::info!("flowfabric library version mismatch, reloading");
48        }
49        Err(_) => {
50            tracing::info!("flowfabric library not loaded, loading");
51        }
52    }
53
54    // Load the library with retry for transient errors
55    const MAX_ATTEMPTS: u32 = 3;
56    let mut last_err = None;
57    for attempt in 1..=MAX_ATTEMPTS {
58        match client.function_load_replace(LIBRARY_SOURCE).await {
59            Ok(_name) => {
60                last_err = None;
61                break;
62            }
63            Err(e) => {
64                if is_permanent_load_error(&e) {
65                    tracing::error!(attempt, error = %e, "FUNCTION LOAD failed with permanent error");
66                    return Err(LoadError::Valkey(e));
67                }
68                tracing::warn!(
69                    attempt,
70                    max_attempts = MAX_ATTEMPTS,
71                    error = %e,
72                    "FUNCTION LOAD failed (transient), retrying in 1s"
73                );
74                last_err = Some(e);
75                if attempt < MAX_ATTEMPTS {
76                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
77                }
78            }
79        }
80    }
81    if let Some(e) = last_err {
82        return Err(LoadError::Valkey(e));
83    }
84
85    // Verify
86    match check_version(client).await {
87        Ok(true) => {
88            tracing::info!("flowfabric library loaded successfully (version {LIBRARY_VERSION})");
89            Ok(())
90        }
91        Ok(false) => {
92            let got = get_version_string(client).await.unwrap_or_default();
93            Err(LoadError::VersionMismatch {
94                expected: LIBRARY_VERSION.to_string(),
95                got,
96            })
97        }
98        Err(e) => Err(LoadError::Valkey(e)),
99    }
100}
101
102/// Check if a FUNCTION LOAD error is permanent (syntax error in Lua) vs transient.
103fn is_permanent_load_error(e: &ferriskey::Error) -> bool {
104    let msg = e.to_string();
105    msg.contains("Error compiling") || msg.contains("syntax error") || msg.contains("ERR Error")
106}
107
108/// Check if ff_version returns the expected version.
109/// Returns Ok(true) if match, Ok(false) if mismatch, Err if function missing.
110async fn check_version(client: &Client) -> Result<bool, ferriskey::Error> {
111    let result: String = client
112        .fcall("ff_version", &[] as &[&str], &[] as &[&str])
113        .await?;
114    Ok(result == LIBRARY_VERSION)
115}
116
117/// Get the version string for error reporting.
118async fn get_version_string(client: &Client) -> Result<String, ferriskey::Error> {
119    let result: String = client
120        .fcall("ff_version", &[] as &[&str], &[] as &[&str])
121        .await?;
122    Ok(result)
123}