barreleye_common/models/
config.rs

1use derive_more::Display;
2use eyre::Result;
3use regex::Regex;
4use sea_orm::{entity::prelude::*, Condition, Set};
5use sea_orm_migration::prelude::{Expr, OnConflict};
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use std::collections::HashMap;
9
10use crate::{models::PrimaryId, utils, BlockHeight, Db};
11
12#[derive(Display, Debug, Copy, Clone, PartialEq, Eq, Hash)]
13pub enum ConfigKey {
14	#[display(fmt = "primary")]
15	Primary,
16	#[display(fmt = "label_fetched_l{}", "_0")]
17	LabelFetched(PrimaryId),
18	#[display(fmt = "indexer_tail_sync_n{}", "_0")]
19	IndexerTailSync(PrimaryId),
20	#[display(fmt = "indexer_chunk_sync_n{}_b{}", "_0", "_1")]
21	IndexerChunkSync(PrimaryId, BlockHeight),
22	#[display(fmt = "indexer_module_sync_n{}_m{}", "_0", "_1")]
23	IndexerModuleSync(PrimaryId, u16),
24	#[display(fmt = "indexer_module_synced_n{}_m{}", "_0", "_1")]
25	IndexerModuleSynced(PrimaryId, u16),
26	#[display(fmt = "indexer_n{}_progress", "_0")]
27	IndexerProgress(PrimaryId),
28	#[display(fmt = "block_height_n{}", "_0")]
29	BlockHeight(PrimaryId),
30	#[display(fmt = "networks_updated")]
31	NetworksUpdated,
32}
33
34impl From<String> for ConfigKey {
35	fn from(s: String) -> Self {
36		let re = Regex::new(r"(\d+)").unwrap();
37
38		let template = re.replace_all(&s, "{}");
39		let n = re.find_iter(&s).filter_map(|n| n.as_str().parse().ok()).collect::<Vec<i64>>();
40
41		match template.to_string().as_str() {
42			"primary" => Self::Primary,
43			"label_fetched_l{}" if n.len() == 1 => Self::LabelFetched(n[0]),
44			"indexer_tail_sync_n{}" if n.len() == 1 => Self::IndexerTailSync(n[0]),
45			"indexer_chunk_sync_n{}_b{}" if n.len() == 2 => {
46				Self::IndexerChunkSync(n[0], n[1] as BlockHeight)
47			}
48			"indexer_module_sync_n{}_m{}" if n.len() == 2 => {
49				Self::IndexerModuleSync(n[0], n[1] as u16)
50			}
51			"indexer_module_synced_n{}_m{}" if n.len() == 2 => {
52				Self::IndexerModuleSynced(n[0], n[1] as u16)
53			}
54			"indexer_n{}_progress" if n.len() == 1 => Self::IndexerProgress(n[0]),
55			"block_height_n{}" if n.len() == 1 => Self::BlockHeight(n[0]),
56			"networks_updated" => Self::NetworksUpdated,
57			_ => panic!("no match in From<String> for ConfigKey"),
58		}
59	}
60}
61
62#[cfg(test)]
63mod tests {
64	use super::*;
65
66	#[test]
67	fn test_config_key_str() {
68		let config_keys = HashMap::from([
69			(ConfigKey::Primary, "primary"),
70			(ConfigKey::LabelFetched(123), "label_fetched_l123"),
71			(ConfigKey::IndexerTailSync(123), "indexer_tail_sync_n123"),
72			(ConfigKey::IndexerChunkSync(123, 456), "indexer_chunk_sync_n123_b456"),
73			(ConfigKey::IndexerModuleSync(123, 456), "indexer_module_sync_n123_m456"),
74			(ConfigKey::IndexerModuleSynced(123, 456), "indexer_module_synced_n123_m456"),
75			(ConfigKey::IndexerProgress(123), "indexer_n123_progress"),
76			(ConfigKey::BlockHeight(123), "block_height_n123"),
77			(ConfigKey::NetworksUpdated, "networks_updated"),
78		]);
79
80		for (config_key, config_key_str) in config_keys.into_iter() {
81			assert_eq!(config_key.to_string(), config_key_str);
82			assert_eq!(Into::<ConfigKey>::into(config_key_str.to_string()), config_key);
83		}
84	}
85}
86
87#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, DeriveEntityModel)]
88#[sea_orm(table_name = "configs")]
89#[serde(rename_all = "camelCase")]
90pub struct Model {
91	#[sea_orm(primary_key)]
92	#[serde(skip_serializing, skip_deserializing)]
93	pub config_id: PrimaryId,
94	pub key: String,
95	pub value: String,
96	#[serde(skip_serializing)]
97	pub updated_at: DateTime,
98	pub created_at: DateTime,
99}
100
101#[derive(Debug)]
102pub struct Value<T: for<'a> Deserialize<'a>> {
103	pub value: T,
104	pub updated_at: DateTime,
105	pub created_at: DateTime,
106}
107
108pub use ActiveModel as ConfigActiveModel;
109pub use Model as Config;
110
111#[derive(Copy, Clone, Debug, EnumIter)]
112pub enum Relation {}
113
114impl RelationTrait for Relation {
115	fn def(&self) -> RelationDef {
116		panic!("No RelationDef")
117	}
118}
119
120impl ActiveModelBehavior for ActiveModel {}
121
122impl Model {
123	pub async fn set<T>(db: &Db, key: ConfigKey, value: T) -> Result<()>
124	where
125		T: Serialize,
126	{
127		Entity::insert(ActiveModel {
128			key: Set(key.to_string()),
129			value: Set(json!(value).to_string()),
130			updated_at: Set(utils::now()),
131			..Default::default()
132		})
133		.on_conflict(
134			OnConflict::column(Column::Key)
135				.update_columns([Column::Value, Column::UpdatedAt])
136				.to_owned(),
137		)
138		.exec(db.get())
139		.await?;
140
141		Ok(())
142	}
143
144	pub async fn set_where<T>(
145		db: &Db,
146		key: ConfigKey,
147		value: T,
148		where_value: Value<T>,
149	) -> Result<bool>
150	where
151		T: Serialize + for<'a> Deserialize<'a>,
152	{
153		let update_result = Entity::update_many()
154			.col_expr(Column::Value, Expr::value(json!(value).to_string()))
155			.col_expr(Column::UpdatedAt, Expr::value(utils::now()))
156			.filter(Column::Key.eq(key.to_string()))
157			.filter(Column::Value.eq(json!(where_value.value).to_string()))
158			.filter(Column::UpdatedAt.eq(where_value.updated_at))
159			.exec(db.get())
160			.await?;
161
162		Ok(update_result.rows_affected == 1)
163	}
164
165	pub async fn set_many<T>(db: &Db, values: HashMap<ConfigKey, T>) -> Result<()>
166	where
167		T: Serialize,
168	{
169		let insert_data = values
170			.into_iter()
171			.map(|(key, value)| ActiveModel {
172				key: Set(key.to_string()),
173				value: Set(json!(value).to_string()),
174				updated_at: Set(utils::now()),
175				..Default::default()
176			})
177			.collect::<Vec<ActiveModel>>();
178
179		Entity::insert_many(insert_data)
180			.on_conflict(
181				OnConflict::column(Column::Key)
182					.update_columns([Column::Value, Column::UpdatedAt])
183					.to_owned(),
184			)
185			.exec(db.get())
186			.await?;
187
188		Ok(())
189	}
190
191	pub async fn get<T>(db: &Db, key: ConfigKey) -> Result<Option<Value<T>>>
192	where
193		T: for<'a> Deserialize<'a>,
194	{
195		Ok(Entity::find().filter(Column::Key.eq(key.to_string())).one(db.get()).await?.map(|m| {
196			Value {
197				value: serde_json::from_str(&m.value).unwrap(),
198				updated_at: m.updated_at,
199				created_at: m.created_at,
200			}
201		}))
202	}
203
204	pub async fn get_many<T>(db: &Db, keys: Vec<ConfigKey>) -> Result<HashMap<ConfigKey, Value<T>>>
205	where
206		T: for<'a> Deserialize<'a>,
207	{
208		Ok(Entity::find()
209			.filter(Column::Key.is_in(keys.iter().map(|k| k.to_string())))
210			.all(db.get())
211			.await?
212			.into_iter()
213			.map(|m| {
214				(
215					m.key.into(),
216					Value {
217						value: serde_json::from_str(&m.value).unwrap(),
218						updated_at: m.updated_at,
219						created_at: m.created_at,
220					},
221				)
222			})
223			.collect())
224	}
225
226	pub async fn get_many_by_keyword<T>(
227		db: &Db,
228		keyword: &str,
229	) -> Result<HashMap<ConfigKey, Value<T>>>
230	where
231		T: for<'a> Deserialize<'a>,
232	{
233		Ok(Entity::find()
234			.filter(Self::get_keyword_condition(keyword))
235			.all(db.get())
236			.await?
237			.into_iter()
238			.map(|m| {
239				(
240					m.key.into(),
241					Value {
242						value: serde_json::from_str(&m.value).unwrap(),
243						updated_at: m.updated_at,
244						created_at: m.created_at,
245					},
246				)
247			})
248			.collect())
249	}
250
251	pub async fn delete(db: &Db, key: ConfigKey) -> Result<()> {
252		Entity::delete_many().filter(Column::Key.eq(key.to_string())).exec(db.get()).await?;
253		Ok(())
254	}
255
256	pub async fn delete_all_by_keyword(db: &Db, keyword: &str) -> Result<()> {
257		Entity::delete_many().filter(Self::get_keyword_condition(keyword)).exec(db.get()).await?;
258
259		Ok(())
260	}
261
262	fn get_keyword_condition(keyword: &str) -> Condition {
263		Condition::any()
264			.add(Column::Key.like(&format!("%_{keyword}_%")))
265			.add(Column::Key.like(&format!("%_{keyword}")))
266	}
267}