1use derive_more::Display;
2use eyre::Result;
3use regex::Regex;
4use sea_orm::{entity::prelude::*, Condition, Set};
5use sea_orm_migration::prelude::{Expr, OnConflict};
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use std::collections::HashMap;
9
10use crate::{models::PrimaryId, utils, BlockHeight, Db};
11
12#[derive(Display, Debug, Copy, Clone, PartialEq, Eq, Hash)]
13pub enum ConfigKey {
14 #[display(fmt = "primary")]
15 Primary,
16 #[display(fmt = "label_fetched_l{}", "_0")]
17 LabelFetched(PrimaryId),
18 #[display(fmt = "indexer_tail_sync_n{}", "_0")]
19 IndexerTailSync(PrimaryId),
20 #[display(fmt = "indexer_chunk_sync_n{}_b{}", "_0", "_1")]
21 IndexerChunkSync(PrimaryId, BlockHeight),
22 #[display(fmt = "indexer_module_sync_n{}_m{}", "_0", "_1")]
23 IndexerModuleSync(PrimaryId, u16),
24 #[display(fmt = "indexer_module_synced_n{}_m{}", "_0", "_1")]
25 IndexerModuleSynced(PrimaryId, u16),
26 #[display(fmt = "indexer_n{}_progress", "_0")]
27 IndexerProgress(PrimaryId),
28 #[display(fmt = "block_height_n{}", "_0")]
29 BlockHeight(PrimaryId),
30 #[display(fmt = "networks_updated")]
31 NetworksUpdated,
32}
33
34impl From<String> for ConfigKey {
35 fn from(s: String) -> Self {
36 let re = Regex::new(r"(\d+)").unwrap();
37
38 let template = re.replace_all(&s, "{}");
39 let n = re.find_iter(&s).filter_map(|n| n.as_str().parse().ok()).collect::<Vec<i64>>();
40
41 match template.to_string().as_str() {
42 "primary" => Self::Primary,
43 "label_fetched_l{}" if n.len() == 1 => Self::LabelFetched(n[0]),
44 "indexer_tail_sync_n{}" if n.len() == 1 => Self::IndexerTailSync(n[0]),
45 "indexer_chunk_sync_n{}_b{}" if n.len() == 2 => {
46 Self::IndexerChunkSync(n[0], n[1] as BlockHeight)
47 }
48 "indexer_module_sync_n{}_m{}" if n.len() == 2 => {
49 Self::IndexerModuleSync(n[0], n[1] as u16)
50 }
51 "indexer_module_synced_n{}_m{}" if n.len() == 2 => {
52 Self::IndexerModuleSynced(n[0], n[1] as u16)
53 }
54 "indexer_n{}_progress" if n.len() == 1 => Self::IndexerProgress(n[0]),
55 "block_height_n{}" if n.len() == 1 => Self::BlockHeight(n[0]),
56 "networks_updated" => Self::NetworksUpdated,
57 _ => panic!("no match in From<String> for ConfigKey"),
58 }
59 }
60}
61
62#[cfg(test)]
63mod tests {
64 use super::*;
65
66 #[test]
67 fn test_config_key_str() {
68 let config_keys = HashMap::from([
69 (ConfigKey::Primary, "primary"),
70 (ConfigKey::LabelFetched(123), "label_fetched_l123"),
71 (ConfigKey::IndexerTailSync(123), "indexer_tail_sync_n123"),
72 (ConfigKey::IndexerChunkSync(123, 456), "indexer_chunk_sync_n123_b456"),
73 (ConfigKey::IndexerModuleSync(123, 456), "indexer_module_sync_n123_m456"),
74 (ConfigKey::IndexerModuleSynced(123, 456), "indexer_module_synced_n123_m456"),
75 (ConfigKey::IndexerProgress(123), "indexer_n123_progress"),
76 (ConfigKey::BlockHeight(123), "block_height_n123"),
77 (ConfigKey::NetworksUpdated, "networks_updated"),
78 ]);
79
80 for (config_key, config_key_str) in config_keys.into_iter() {
81 assert_eq!(config_key.to_string(), config_key_str);
82 assert_eq!(Into::<ConfigKey>::into(config_key_str.to_string()), config_key);
83 }
84 }
85}
86
87#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, DeriveEntityModel)]
88#[sea_orm(table_name = "configs")]
89#[serde(rename_all = "camelCase")]
90pub struct Model {
91 #[sea_orm(primary_key)]
92 #[serde(skip_serializing, skip_deserializing)]
93 pub config_id: PrimaryId,
94 pub key: String,
95 pub value: String,
96 #[serde(skip_serializing)]
97 pub updated_at: DateTime,
98 pub created_at: DateTime,
99}
100
101#[derive(Debug)]
102pub struct Value<T: for<'a> Deserialize<'a>> {
103 pub value: T,
104 pub updated_at: DateTime,
105 pub created_at: DateTime,
106}
107
108pub use ActiveModel as ConfigActiveModel;
109pub use Model as Config;
110
111#[derive(Copy, Clone, Debug, EnumIter)]
112pub enum Relation {}
113
114impl RelationTrait for Relation {
115 fn def(&self) -> RelationDef {
116 panic!("No RelationDef")
117 }
118}
119
120impl ActiveModelBehavior for ActiveModel {}
121
122impl Model {
123 pub async fn set<T>(db: &Db, key: ConfigKey, value: T) -> Result<()>
124 where
125 T: Serialize,
126 {
127 Entity::insert(ActiveModel {
128 key: Set(key.to_string()),
129 value: Set(json!(value).to_string()),
130 updated_at: Set(utils::now()),
131 ..Default::default()
132 })
133 .on_conflict(
134 OnConflict::column(Column::Key)
135 .update_columns([Column::Value, Column::UpdatedAt])
136 .to_owned(),
137 )
138 .exec(db.get())
139 .await?;
140
141 Ok(())
142 }
143
144 pub async fn set_where<T>(
145 db: &Db,
146 key: ConfigKey,
147 value: T,
148 where_value: Value<T>,
149 ) -> Result<bool>
150 where
151 T: Serialize + for<'a> Deserialize<'a>,
152 {
153 let update_result = Entity::update_many()
154 .col_expr(Column::Value, Expr::value(json!(value).to_string()))
155 .col_expr(Column::UpdatedAt, Expr::value(utils::now()))
156 .filter(Column::Key.eq(key.to_string()))
157 .filter(Column::Value.eq(json!(where_value.value).to_string()))
158 .filter(Column::UpdatedAt.eq(where_value.updated_at))
159 .exec(db.get())
160 .await?;
161
162 Ok(update_result.rows_affected == 1)
163 }
164
165 pub async fn set_many<T>(db: &Db, values: HashMap<ConfigKey, T>) -> Result<()>
166 where
167 T: Serialize,
168 {
169 let insert_data = values
170 .into_iter()
171 .map(|(key, value)| ActiveModel {
172 key: Set(key.to_string()),
173 value: Set(json!(value).to_string()),
174 updated_at: Set(utils::now()),
175 ..Default::default()
176 })
177 .collect::<Vec<ActiveModel>>();
178
179 Entity::insert_many(insert_data)
180 .on_conflict(
181 OnConflict::column(Column::Key)
182 .update_columns([Column::Value, Column::UpdatedAt])
183 .to_owned(),
184 )
185 .exec(db.get())
186 .await?;
187
188 Ok(())
189 }
190
191 pub async fn get<T>(db: &Db, key: ConfigKey) -> Result<Option<Value<T>>>
192 where
193 T: for<'a> Deserialize<'a>,
194 {
195 Ok(Entity::find().filter(Column::Key.eq(key.to_string())).one(db.get()).await?.map(|m| {
196 Value {
197 value: serde_json::from_str(&m.value).unwrap(),
198 updated_at: m.updated_at,
199 created_at: m.created_at,
200 }
201 }))
202 }
203
204 pub async fn get_many<T>(db: &Db, keys: Vec<ConfigKey>) -> Result<HashMap<ConfigKey, Value<T>>>
205 where
206 T: for<'a> Deserialize<'a>,
207 {
208 Ok(Entity::find()
209 .filter(Column::Key.is_in(keys.iter().map(|k| k.to_string())))
210 .all(db.get())
211 .await?
212 .into_iter()
213 .map(|m| {
214 (
215 m.key.into(),
216 Value {
217 value: serde_json::from_str(&m.value).unwrap(),
218 updated_at: m.updated_at,
219 created_at: m.created_at,
220 },
221 )
222 })
223 .collect())
224 }
225
226 pub async fn get_many_by_keyword<T>(
227 db: &Db,
228 keyword: &str,
229 ) -> Result<HashMap<ConfigKey, Value<T>>>
230 where
231 T: for<'a> Deserialize<'a>,
232 {
233 Ok(Entity::find()
234 .filter(Self::get_keyword_condition(keyword))
235 .all(db.get())
236 .await?
237 .into_iter()
238 .map(|m| {
239 (
240 m.key.into(),
241 Value {
242 value: serde_json::from_str(&m.value).unwrap(),
243 updated_at: m.updated_at,
244 created_at: m.created_at,
245 },
246 )
247 })
248 .collect())
249 }
250
251 pub async fn delete(db: &Db, key: ConfigKey) -> Result<()> {
252 Entity::delete_many().filter(Column::Key.eq(key.to_string())).exec(db.get()).await?;
253 Ok(())
254 }
255
256 pub async fn delete_all_by_keyword(db: &Db, keyword: &str) -> Result<()> {
257 Entity::delete_many().filter(Self::get_keyword_condition(keyword)).exec(db.get()).await?;
258
259 Ok(())
260 }
261
262 fn get_keyword_condition(keyword: &str) -> Condition {
263 Condition::any()
264 .add(Column::Key.like(&format!("%_{keyword}_%")))
265 .add(Column::Key.like(&format!("%_{keyword}")))
266 }
267}