1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use crb::agent::{
4 Address, Agent, AgentSession, Context, Duty, ManagedContext, Next, OnEvent, ToAddress,
5};
6use crb::core::UniqueId;
7use crb::send::{Recipient, Sender};
8use crb::superagent::{ManageSubscription, Subscription, Timeout};
9use derive_more::{Deref, DerefMut, From};
10use notify::{
11 recommended_watcher, Event, EventHandler, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
12};
13use std::collections::HashSet;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::fs;
18use toml::{Table, Value};
19
20const CONFIG_NAME: &str = "ice9.toml";
21const TEMPLATE_NAME: &str = "ice9.example.toml";
22
23pub struct ConfigLayer {
24 path: Arc<PathBuf>,
25 config: Value,
26 _watcher: RecommendedWatcher,
27}
28
29impl ConfigLayer {
30 async fn read_config(&mut self) -> Result<()> {
31 log::info!("Reading the config layer: {}", self.path.display());
32 let content = fs::read_to_string(self.path.as_ref()).await?;
33 let config = toml::from_str(&content)?;
34 self.config = config;
35 Ok(())
36 }
37}
38
39pub struct ChangedFiles {
40 _debouncer: Timeout,
41 files: HashSet<Arc<PathBuf>>,
42}
43
44impl ChangedFiles {
45 fn new(debouncer: Timeout) -> Self {
46 Self {
47 _debouncer: debouncer,
48 files: HashSet::new(),
49 }
50 }
51}
52
53pub struct ConfigLoader {
54 layers: Vec<ConfigLayer>,
55 changed_files: Option<ChangedFiles>,
56 subscribers: HashSet<UniqueId<ConfigUpdates>>,
57 merged_config: Value,
58}
59
60impl ConfigLoader {
61 pub fn new() -> Self {
62 Self {
63 layers: Vec::new(),
64 changed_files: None,
65 subscribers: HashSet::new(),
66 merged_config: table(),
67 }
68 }
69}
70
71impl Agent for ConfigLoader {
72 type Context = AgentSession<Self>;
73
74 fn begin(&mut self) -> Next<Self> {
75 Next::duty(Initialize)
76 }
77
78 fn interrupt(&mut self, ctx: &mut Context<Self>) {
79 self.changed_files.take();
80 ctx.shutdown();
81 }
82}
83
84impl ConfigLoader {
85 async fn add_layer(&mut self, path: PathBuf, ctx: &mut Context<Self>) -> Result<()> {
86 log::info!("Add a config layer: {}", path.display());
87 let path = Arc::new(path);
88
89 if !path.exists() {
91 log::info!("Creating an empty config layer: {}", path.display());
92 fs::write(path.as_ref(), "").await?;
93 }
94
95 let forwarder = EventsForwarder::new(ctx, path.clone());
97 let mut watcher = recommended_watcher(forwarder)?;
98 watcher.watch(path.as_ref(), RecursiveMode::NonRecursive)?;
99
100 let mut layer = ConfigLayer {
102 path,
103 config: table(),
104 _watcher: watcher,
105 };
106 layer.read_config().await?;
107
108 self.layers.push(layer);
109 Ok(())
110 }
111
112 async fn update_configs(&mut self, all: bool) -> Result<()> {
113 let changed_files = self
114 .changed_files
115 .take()
116 .map(|record| record.files)
117 .unwrap_or_default();
118 let mut new_merged_config = table();
119 for layer in &mut self.layers {
120 if all || changed_files.contains(&layer.path) {
121 layer.read_config().await?;
122 }
123 merge_configs(&mut new_merged_config, &layer.config);
124 }
125 if self.merged_config != new_merged_config {
126 let new_config = NewConfig(new_merged_config.clone());
127 for subscriber in &self.subscribers {
128 subscriber.send(new_config.clone()).ok();
129 }
130 self.merged_config = new_merged_config;
131 }
132 Ok(())
133 }
134
135 fn schedule_update(&mut self, path: Arc<PathBuf>, ctx: &mut Context<Self>) -> Result<()> {
136 match self.changed_files.as_mut() {
137 Some(changed_files) => {
138 changed_files.files.insert(path);
139 }
140 None => {
141 let address = ctx.address().clone();
142 let duration = Duration::from_millis(250);
143 let timeout = Timeout::new(address, duration, ());
144 let mut changed_files = ChangedFiles::new(timeout);
145 changed_files.files.insert(path);
146 self.changed_files = Some(changed_files);
147 }
148 }
149 Ok(())
150 }
151
152 fn current_config(&self) -> Value {
153 self.merged_config.clone()
154 }
155}
156
157struct Initialize;
158
159#[async_trait]
160impl Duty<Initialize> for ConfigLoader {
161 async fn handle(&mut self, _: Initialize, ctx: &mut Context<Self>) -> Result<Next<Self>> {
162 let config_dir = dirs::home_dir()
164 .ok_or_else(|| anyhow!("Config dir is not provided."))?
165 .join(".config")
166 .join("ice9");
167 fs::create_dir_all(&config_dir).await?;
168 let global_config = config_dir.join(CONFIG_NAME);
169 self.add_layer(global_config, ctx).await?;
170
171 let local_config = CONFIG_NAME.into();
173 self.add_layer(local_config, ctx).await?;
174
175 self.update_configs(true).await?;
176
177 Ok(Next::events())
178 }
179}
180
181#[derive(From)]
182struct EventsForwarder {
183 tag: Arc<PathBuf>,
184 address: Address<ConfigLoader>,
185}
186
187impl EventsForwarder {
188 pub fn new(address: impl ToAddress<ConfigLoader>, tag: Arc<PathBuf>) -> Self {
189 Self {
190 tag,
191 address: address.to_address(),
192 }
193 }
194}
195
196impl EventHandler for EventsForwarder {
197 fn handle_event(&mut self, result: WatchResult) {
198 let event = WatchEvent {
199 tag: self.tag.clone(),
200 result,
201 };
202 self.address.event(event).ok();
203 }
204}
205
206type WatchResult = Result<Event, notify::Error>;
207
208struct WatchEvent {
209 tag: Arc<PathBuf>,
210 result: WatchResult,
211}
212
213#[async_trait]
214impl OnEvent<WatchEvent> for ConfigLoader {
215 async fn handle(&mut self, msg: WatchEvent, ctx: &mut Context<Self>) -> Result<()> {
216 let event = msg.result?;
217 match event.kind {
218 EventKind::Create(_) | EventKind::Modify(_) => {
219 self.schedule_update(msg.tag, ctx)?;
220 }
221 _other => {
222 }
224 }
225 Ok(())
226 }
227}
228
229#[async_trait]
230impl OnEvent<()> for ConfigLoader {
231 async fn handle(&mut self, _: (), _ctx: &mut Context<Self>) -> Result<()> {
232 self.update_configs(false).await
233 }
234}
235
236#[derive(Clone)]
237pub struct NewConfig(pub Value);
238
239#[derive(Deref, DerefMut)]
240pub struct ConfigUpdates {
241 recipient: Recipient<NewConfig>,
242}
243
244impl ConfigUpdates {
245 pub fn for_listener<A>(addr: impl ToAddress<A>) -> Self
246 where
247 A: OnEvent<NewConfig>,
248 {
249 Self {
250 recipient: addr.to_address().recipient(),
251 }
252 }
253}
254
255impl Subscription for ConfigUpdates {
256 type State = Value;
257}
258
259#[async_trait]
260impl ManageSubscription<ConfigUpdates> for ConfigLoader {
261 async fn subscribe(
262 &mut self,
263 sub_id: UniqueId<ConfigUpdates>,
264 _ctx: &mut Context<Self>,
265 ) -> Result<Value> {
266 self.subscribers.insert(sub_id);
268 let value = self.current_config();
269 Ok(value)
270 }
271
272 async fn unsubscribe(
273 &mut self,
274 sub_id: UniqueId<ConfigUpdates>,
275 _ctx: &mut Context<Self>,
276 ) -> Result<()> {
277 self.subscribers.remove(&sub_id);
278 Ok(())
279 }
280}
281
282pub struct StoreTemplate(pub Value);
283
284#[async_trait]
285impl OnEvent<StoreTemplate> for ConfigLoader {
286 async fn handle(&mut self, msg: StoreTemplate, _ctx: &mut Context<Self>) -> Result<()> {
287 let content = toml::to_string_pretty(&msg.0)?;
288 fs::write(TEMPLATE_NAME, content).await?;
289 Ok(())
290 }
291}
292
293pub fn wrap_level(title: &str, value: Value) -> Value {
294 let mut wrapper = Table::new();
295 wrapper.insert(title.into(), value);
296 Value::Table(wrapper)
297}
298
299pub fn table() -> Value {
300 Value::Table(Table::new())
301}
302
303pub fn merge_configs(base: &mut Value, overlay: &Value) {
304 if let (Value::Table(base_table), Value::Table(overlay_table)) = (base, overlay) {
305 for (key, overlay_value) in overlay_table {
306 match base_table.get_mut(key) {
307 Some(base_value) => {
308 if overlay_value.is_table() && base_value.is_table() {
310 merge_configs(base_value, overlay_value);
311 } else {
312 *base_value = overlay_value.clone();
314 }
315 }
316 None => {
317 base_table.insert(key.clone(), overlay_value.clone());
319 }
320 }
321 }
322 }
323}