ice9_std/
config_loader.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use crb::agent::{
4    Address, Agent, AgentSession, Context, Duty, ManagedContext, Next, OnEvent, ToAddress,
5};
6use crb::core::UniqueId;
7use crb::send::{Recipient, Sender};
8use crb::superagent::{ManageSubscription, Subscription, Timeout};
9use derive_more::{Deref, DerefMut, From};
10use notify::{
11    recommended_watcher, Event, EventHandler, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
12};
13use std::collections::HashSet;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::fs;
18use toml::{Table, Value};
19
20const CONFIG_NAME: &str = "ice9.toml";
21const TEMPLATE_NAME: &str = "ice9.example.toml";
22
23pub struct ConfigLayer {
24    path: Arc<PathBuf>,
25    config: Value,
26    _watcher: RecommendedWatcher,
27}
28
29impl ConfigLayer {
30    async fn read_config(&mut self) -> Result<()> {
31        log::info!("Reading the config layer: {}", self.path.display());
32        let content = fs::read_to_string(self.path.as_ref()).await?;
33        let config = toml::from_str(&content)?;
34        self.config = config;
35        Ok(())
36    }
37}
38
39pub struct ChangedFiles {
40    _debouncer: Timeout,
41    files: HashSet<Arc<PathBuf>>,
42}
43
44impl ChangedFiles {
45    fn new(debouncer: Timeout) -> Self {
46        Self {
47            _debouncer: debouncer,
48            files: HashSet::new(),
49        }
50    }
51}
52
53pub struct ConfigLoader {
54    layers: Vec<ConfigLayer>,
55    changed_files: Option<ChangedFiles>,
56    subscribers: HashSet<UniqueId<ConfigUpdates>>,
57    merged_config: Value,
58}
59
60impl ConfigLoader {
61    pub fn new() -> Self {
62        Self {
63            layers: Vec::new(),
64            changed_files: None,
65            subscribers: HashSet::new(),
66            merged_config: table(),
67        }
68    }
69}
70
71impl Agent for ConfigLoader {
72    type Context = AgentSession<Self>;
73
74    fn begin(&mut self) -> Next<Self> {
75        Next::duty(Initialize)
76    }
77
78    fn interrupt(&mut self, ctx: &mut Context<Self>) {
79        self.changed_files.take();
80        ctx.shutdown();
81    }
82}
83
84impl ConfigLoader {
85    async fn add_layer(&mut self, path: PathBuf, ctx: &mut Context<Self>) -> Result<()> {
86        log::info!("Add a config layer: {}", path.display());
87        let path = Arc::new(path);
88
89        // Create a config file if doesn't exist
90        if !path.exists() {
91            log::info!("Creating an empty config layer: {}", path.display());
92            fs::write(path.as_ref(), "").await?;
93        }
94
95        // Setup a watcher for file
96        let forwarder = EventsForwarder::new(ctx, path.clone());
97        let mut watcher = recommended_watcher(forwarder)?;
98        watcher.watch(path.as_ref(), RecursiveMode::NonRecursive)?;
99
100        // Read a config
101        let mut layer = ConfigLayer {
102            path,
103            config: table(),
104            _watcher: watcher,
105        };
106        layer.read_config().await?;
107
108        self.layers.push(layer);
109        Ok(())
110    }
111
112    async fn update_configs(&mut self, all: bool) -> Result<()> {
113        let changed_files = self
114            .changed_files
115            .take()
116            .map(|record| record.files)
117            .unwrap_or_default();
118        let mut new_merged_config = table();
119        for layer in &mut self.layers {
120            if all || changed_files.contains(&layer.path) {
121                layer.read_config().await?;
122            }
123            merge_configs(&mut new_merged_config, &layer.config);
124        }
125        if self.merged_config != new_merged_config {
126            let new_config = NewConfig(new_merged_config.clone());
127            for subscriber in &self.subscribers {
128                subscriber.send(new_config.clone()).ok();
129            }
130            self.merged_config = new_merged_config;
131        }
132        Ok(())
133    }
134
135    fn schedule_update(&mut self, path: Arc<PathBuf>, ctx: &mut Context<Self>) -> Result<()> {
136        match self.changed_files.as_mut() {
137            Some(changed_files) => {
138                changed_files.files.insert(path);
139            }
140            None => {
141                let address = ctx.address().clone();
142                let duration = Duration::from_millis(250);
143                let timeout = Timeout::new(address, duration, ());
144                let mut changed_files = ChangedFiles::new(timeout);
145                changed_files.files.insert(path);
146                self.changed_files = Some(changed_files);
147            }
148        }
149        Ok(())
150    }
151
152    fn current_config(&self) -> Value {
153        self.merged_config.clone()
154    }
155}
156
157struct Initialize;
158
159#[async_trait]
160impl Duty<Initialize> for ConfigLoader {
161    async fn handle(&mut self, _: Initialize, ctx: &mut Context<Self>) -> Result<Next<Self>> {
162        // Global config layer: ~/.config/ice9.toml
163        let config_dir = dirs::home_dir()
164            .ok_or_else(|| anyhow!("Config dir is not provided."))?
165            .join(".config")
166            .join("ice9");
167        fs::create_dir_all(&config_dir).await?;
168        let global_config = config_dir.join(CONFIG_NAME);
169        self.add_layer(global_config, ctx).await?;
170
171        // Local config layer: $PWD/ice9.toml
172        let local_config = CONFIG_NAME.into();
173        self.add_layer(local_config, ctx).await?;
174
175        self.update_configs(true).await?;
176
177        Ok(Next::events())
178    }
179}
180
181#[derive(From)]
182struct EventsForwarder {
183    tag: Arc<PathBuf>,
184    address: Address<ConfigLoader>,
185}
186
187impl EventsForwarder {
188    pub fn new(address: impl ToAddress<ConfigLoader>, tag: Arc<PathBuf>) -> Self {
189        Self {
190            tag,
191            address: address.to_address(),
192        }
193    }
194}
195
196impl EventHandler for EventsForwarder {
197    fn handle_event(&mut self, result: WatchResult) {
198        let event = WatchEvent {
199            tag: self.tag.clone(),
200            result,
201        };
202        self.address.event(event).ok();
203    }
204}
205
206type WatchResult = Result<Event, notify::Error>;
207
208struct WatchEvent {
209    tag: Arc<PathBuf>,
210    result: WatchResult,
211}
212
213#[async_trait]
214impl OnEvent<WatchEvent> for ConfigLoader {
215    async fn handle(&mut self, msg: WatchEvent, ctx: &mut Context<Self>) -> Result<()> {
216        let event = msg.result?;
217        match event.kind {
218            EventKind::Create(_) | EventKind::Modify(_) => {
219                self.schedule_update(msg.tag, ctx)?;
220            }
221            _other => {
222                // TODO: How to handle other methods? What if the config was removed?
223            }
224        }
225        Ok(())
226    }
227}
228
229#[async_trait]
230impl OnEvent<()> for ConfigLoader {
231    async fn handle(&mut self, _: (), _ctx: &mut Context<Self>) -> Result<()> {
232        self.update_configs(false).await
233    }
234}
235
236#[derive(Clone)]
237pub struct NewConfig(pub Value);
238
239#[derive(Deref, DerefMut)]
240pub struct ConfigUpdates {
241    recipient: Recipient<NewConfig>,
242}
243
244impl ConfigUpdates {
245    pub fn for_listener<A>(addr: impl ToAddress<A>) -> Self
246    where
247        A: OnEvent<NewConfig>,
248    {
249        Self {
250            recipient: addr.to_address().recipient(),
251        }
252    }
253}
254
255impl Subscription for ConfigUpdates {
256    type State = Value;
257}
258
259#[async_trait]
260impl ManageSubscription<ConfigUpdates> for ConfigLoader {
261    async fn subscribe(
262        &mut self,
263        sub_id: UniqueId<ConfigUpdates>,
264        _ctx: &mut Context<Self>,
265    ) -> Result<Value> {
266        // Read on initialze and keep
267        self.subscribers.insert(sub_id);
268        let value = self.current_config();
269        Ok(value)
270    }
271
272    async fn unsubscribe(
273        &mut self,
274        sub_id: UniqueId<ConfigUpdates>,
275        _ctx: &mut Context<Self>,
276    ) -> Result<()> {
277        self.subscribers.remove(&sub_id);
278        Ok(())
279    }
280}
281
282pub struct StoreTemplate(pub Value);
283
284#[async_trait]
285impl OnEvent<StoreTemplate> for ConfigLoader {
286    async fn handle(&mut self, msg: StoreTemplate, _ctx: &mut Context<Self>) -> Result<()> {
287        let content = toml::to_string_pretty(&msg.0)?;
288        fs::write(TEMPLATE_NAME, content).await?;
289        Ok(())
290    }
291}
292
293pub fn wrap_level(title: &str, value: Value) -> Value {
294    let mut wrapper = Table::new();
295    wrapper.insert(title.into(), value);
296    Value::Table(wrapper)
297}
298
299pub fn table() -> Value {
300    Value::Table(Table::new())
301}
302
303pub fn merge_configs(base: &mut Value, overlay: &Value) {
304    if let (Value::Table(base_table), Value::Table(overlay_table)) = (base, overlay) {
305        for (key, overlay_value) in overlay_table {
306            match base_table.get_mut(key) {
307                Some(base_value) => {
308                    // If both values are tables, recursively merge them
309                    if overlay_value.is_table() && base_value.is_table() {
310                        merge_configs(base_value, overlay_value);
311                    } else {
312                        // Otherwise, overlay value overwrites base value
313                        *base_value = overlay_value.clone();
314                    }
315                }
316                None => {
317                    // If key doesn't exist in base, add it
318                    base_table.insert(key.clone(), overlay_value.clone());
319                }
320            }
321        }
322    }
323}