1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
//! Tests for the ConfigWatcher callback plumbing.
//!
//! Previously lived inline in `utils/config_watcher.rs` as
//! `#[cfg(test)] mod tests`. Moved out per the project convention that
//! all tests live under `src/tests/`.
//!
//! The `fires_on_change` test builds its own mini-watcher (instead of
//! calling the production `spawn()`) because `spawn()` requires real
//! Config/keys TOML files in the user's home dir — the test just needs
//! to verify that a filesystem change triggers the callback plumbing.
use crate::utils::config_watcher::ReloadCallback;
use notify::Watcher;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn reload_callback_type_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<ReloadCallback>();
}
#[tokio::test]
async fn reload_callback_fires_on_change() {
let tmp = tempfile::tempdir().unwrap();
let config_path = tmp.path().join("config.toml");
let keys_path = tmp.path().join("keys.toml");
std::fs::write(&config_path, "[channels.telegram]\nenabled = false\n").unwrap();
std::fs::write(&keys_path, "").unwrap();
let call_count = Arc::new(AtomicUsize::new(0));
let counter = call_count.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(4);
let cb: ReloadCallback = Arc::new(move |_cfg| {
counter.fetch_add(1, Ordering::Relaxed);
let _ = tx.try_send(());
});
// Spawn the watcher and discard its JoinHandle. spawn_blocking tasks are
// not cancelled by dropping the handle, so the watcher keeps running while
// the test below mutates the files and waits for the callback. drop() is
// the explicit, lint-clean way to discard the returned future.
drop({
let config_path = config_path.clone();
let keys_path = keys_path.clone();
let callbacks = vec![cb];
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = notify::recommended_watcher(move |res| {
if let Ok(event) = res {
let _ = tx.send(event);
}
})
.unwrap();
let _ = watcher.watch(&config_path, notify::RecursiveMode::NonRecursive);
let _ = watcher.watch(&keys_path, notify::RecursiveMode::NonRecursive);
let debounce = std::time::Duration::from_millis(100);
// Hard deadline so the blocking thread exits and doesn't
// hang tokio runtime shutdown. Widened from 8s → 20s so
// the test itself has room to retry writes if FSEvents on
// macOS coalesces or drops an early event under heavy
// parallel-test CPU load.
let end = std::time::Instant::now() + std::time::Duration::from_secs(20);
loop {
let remaining = end.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
break;
}
let poll = remaining.min(std::time::Duration::from_millis(200));
match rx.recv_timeout(poll) {
Ok(_) => {}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
}
let deadline = std::time::Instant::now() + debounce;
loop {
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
break;
}
match rx.recv_timeout(remaining) {
Ok(_) => {}
Err(_) => break,
}
}
for cb in &callbacks {
let cb = cb.clone();
rt.spawn(async move { cb(crate::config::Config::default()) });
}
}
})
});
// Give the watcher thread time to register its subscription before
// the first write. FSEvents on macOS needs a moment after watch()
// to actually wire up notifications.
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
// Write a few times with spacing. FSEvents under heavy parallel
// test load can coalesce or drop rapid events — retrying 3 times
// with a 400ms gap between writes gives the watcher more than one
// chance to fire the callback. Each write flips the value so the
// file mtime changes meaningfully.
for i in 0..3 {
let val = if i % 2 == 0 { "true" } else { "false" };
std::fs::write(
&config_path,
format!("[channels.telegram]\nenabled = {}\n", val),
)
.unwrap();
if tokio::time::timeout(std::time::Duration::from_millis(3000), rx.recv())
.await
.is_ok()
{
break;
}
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
}
assert!(
call_count.load(Ordering::Relaxed) >= 1,
"callback should have fired at least once after file changes (got {} calls)",
call_count.load(Ordering::Relaxed)
);
}
/// Atomic saves (write a temp file, then rename over the target) are how
/// editors and our own `toml_edit` writes persist config. They change the
/// file's inode, so a file-level watch silently misses them. The production
/// watcher fixes this by watching the DIRECTORY and filtering by filename —
/// this test pins that behavior (a rename over config.toml is detected).
#[tokio::test]
async fn atomic_save_via_rename_is_detected_with_dir_watch() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_path_buf();
let config_path = dir.join("config.toml");
std::fs::write(&config_path, "[agent]\n").unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(4);
drop({
let dir = dir.clone();
tokio::task::spawn_blocking(move || {
let (etx, erx) = std::sync::mpsc::channel();
let mut watcher =
notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
if let Ok(event) = res {
// Mirror production: react only to the three config files.
let relevant = event.paths.iter().any(|p| {
matches!(
p.file_name().and_then(|n| n.to_str()),
Some("config.toml" | "keys.toml" | "commands.toml")
)
});
if relevant {
let _ = etx.send(());
}
}
})
.unwrap();
// Watch the DIRECTORY, like production does.
let _ = watcher.watch(&dir, notify::RecursiveMode::NonRecursive);
let end = std::time::Instant::now() + std::time::Duration::from_secs(20);
loop {
let remaining = end.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
break;
}
match erx.recv_timeout(remaining.min(std::time::Duration::from_millis(200))) {
Ok(_) => {
let _ = tx.try_send(());
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
}
}
})
});
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let mut detected = false;
for i in 0..3 {
// Write to a temp file, then rename over the target — the atomic-save
// pattern a file-level watch would miss.
let tmp_path = dir.join(format!("config.toml.tmp{i}"));
std::fs::write(
&tmp_path,
format!("[agent]\ncontext_limit = {}\n", 200_000 + i),
)
.unwrap();
std::fs::rename(&tmp_path, &config_path).unwrap();
if tokio::time::timeout(std::time::Duration::from_millis(3000), rx.recv())
.await
.is_ok()
{
detected = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
}
assert!(
detected,
"directory watch must detect an atomic save (write temp + rename over config.toml)"
);
}