1use ferriskey::Client;
7
8use crate::{LIBRARY_SOURCE, LIBRARY_VERSION};
9
10#[derive(Debug, thiserror::Error)]
12pub enum LoadError {
13 #[error("valkey: {0}")]
17 Valkey(#[from] ferriskey::Error),
18 #[error("version mismatch after load: expected {expected}, got {got}")]
21 VersionMismatch { expected: String, got: String },
22}
23
24impl LoadError {
25 pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
27 match self {
28 Self::Valkey(e) => Some(e.kind()),
29 _ => None,
30 }
31 }
32}
33
34pub async fn ensure_library(client: &Client) -> Result<(), LoadError> {
41 match check_version(client).await {
42 Ok(true) => {
43 tracing::debug!("flowfabric library already loaded at version {LIBRARY_VERSION}");
44 return Ok(());
45 }
46 Ok(false) => {
47 tracing::info!("flowfabric library version mismatch, reloading");
48 }
49 Err(_) => {
50 tracing::info!("flowfabric library not loaded, loading");
51 }
52 }
53
54 const MAX_ATTEMPTS: u32 = 3;
56 let mut last_err = None;
57 for attempt in 1..=MAX_ATTEMPTS {
58 match client.function_load_replace(LIBRARY_SOURCE).await {
59 Ok(_name) => {
60 last_err = None;
61 break;
62 }
63 Err(e) => {
64 if is_permanent_load_error(&e) {
65 tracing::error!(attempt, error = %e, "FUNCTION LOAD failed with permanent error");
66 return Err(LoadError::Valkey(e));
67 }
68 tracing::warn!(
69 attempt,
70 max_attempts = MAX_ATTEMPTS,
71 error = %e,
72 "FUNCTION LOAD failed (transient), retrying in 1s"
73 );
74 last_err = Some(e);
75 if attempt < MAX_ATTEMPTS {
76 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
77 }
78 }
79 }
80 }
81 if let Some(e) = last_err {
82 return Err(LoadError::Valkey(e));
83 }
84
85 match check_version(client).await {
87 Ok(true) => {
88 tracing::info!("flowfabric library loaded successfully (version {LIBRARY_VERSION})");
89 Ok(())
90 }
91 Ok(false) => {
92 let got = get_version_string(client).await.unwrap_or_default();
93 Err(LoadError::VersionMismatch {
94 expected: LIBRARY_VERSION.to_string(),
95 got,
96 })
97 }
98 Err(e) => Err(LoadError::Valkey(e)),
99 }
100}
101
102fn is_permanent_load_error(e: &ferriskey::Error) -> bool {
104 let msg = e.to_string();
105 msg.contains("Error compiling") || msg.contains("syntax error") || msg.contains("ERR Error")
106}
107
108async fn check_version(client: &Client) -> Result<bool, ferriskey::Error> {
111 let result: String = client
112 .fcall("ff_version", &[] as &[&str], &[] as &[&str])
113 .await?;
114 Ok(result == LIBRARY_VERSION)
115}
116
117async fn get_version_string(client: &Client) -> Result<String, ferriskey::Error> {
119 let result: String = client
120 .fcall("ff_version", &[] as &[&str], &[] as &[&str])
121 .await?;
122 Ok(result)
123}