grafix_toolbox/kit/policies/func/
file.rs1use crate::{asyn::*, lib::*};
2use std::path::{Path, PathBuf};
3
4pub mod Save {
5 use super::*;
6
7 pub fn Write(p: impl Into<Astr>, data: impl Into<Arc<[u8]>>) {
8 sender().send((p.into(), MessageType::Write, data.into())).valid();
9 }
10 pub fn Append(p: impl Into<Astr>, data: impl Into<Arc<[u8]>>) {
11 sender().send((p.into(), MessageType::Append, data.into())).valid();
12 }
13 pub fn Archive(args: impl CompressArgs) {
14 let (p, data, level) = args.get();
15 sender().send((p, MessageType::ComprW(level), data)).valid();
16 }
17
18 type Args = (Astr, Arc<[u8]>, i32);
19 pub trait CompressArgs {
20 fn get(self) -> Args;
21 }
22 impl<T: Into<Arc<[u8]>>, F: Into<Astr>, C> CompressArgs for (F, T, C)
23 where
24 i32: Cast<C>,
25 {
26 fn get(self) -> Args {
27 (self.0.into(), self.1.into(), i32(self.2).clamp(1, 22))
28 }
29 }
30 impl<T: Into<Arc<[u8]>>, F: Into<Astr>> CompressArgs for (F, T) {
31 fn get(self) -> Args {
32 (self.0.into(), self.1.into(), 1)
33 }
34 }
35
36 enum MessageType {
37 Write,
38 Append,
39 ComprW(i32),
40 Close,
41 }
42 type Message = (Astr, MessageType, Arc<[u8]>);
43 fn sender() -> &'static Sender<Message> {
44 use MessageType::*;
45 static SENDER: OnceLock<Sender<Message>> = OnceLock::new();
46 SENDER.get_or_init(|| {
47 let (sn, mut rx) = chan::unbounded::<Message>();
48 let writer = task::Runtime().spawn(async move || {
49 while let Some(msg) = rx.recv().await {
50 let (name, operation, data) = msg;
51 let file = match operation {
52 Close => break,
53 Write | ComprW(_) => fs::File::create(&*name).await,
54 Append => fs::OpenOptions::new().append(true).create(true).open(&*name).await,
55 };
56
57 let Ok(mut file) = file else {
58 let name: PathBuf = (*name).into();
59 FAIL!({ continue }, "{:?}", fmt_err(file, &name));
60 };
61
62 let data = if let ComprW(l) = operation {
63 task::spawn_blocking(move || zstd::encode_all(&data[..], l))
64 .await
65 .fail()
66 .explain_err(|| format!("Cannot encode file {name:?}"))
67 .warn()
68 .into()
69 } else {
70 data
71 };
72
73 file.write_all(&data).await.explain_err(|| format!("Cannot write {name:?}")).warn();
74 file.sync_all().await.explain_err(|| format!("Cannot sync {name:?}")).warn();
75 }
76 });
77
78 logger::Logger::shutdown_hook({
79 let sn = sn.clone();
80 move || {
81 sn.send(("".into(), Close, Def())).expect("E| Cannot close async write system");
82 task::Runtime().finish(writer);
83 }
84 });
85
86 sn
87 })
88 }
89}
90
91pub mod Load {
92 use {super::*, std::fs};
93 pub fn File(p: impl AsRef<Path>) -> Res<Vec<u8>> {
94 p.as_ref().pipe(|p| fmt_err(fs::read(p), p))
95 }
96 pub fn Text(p: impl AsRef<Path>) -> Res<String> {
97 p.as_ref().pipe(|p| fmt_err(fs::read_to_string(p), p))
98 }
99 pub fn Archive(p: impl AsRef<Path>) -> Res<Vec<u8>> {
100 p.as_ref().pipe(|p| fmt_err(fs::File::open(p).and_then(zstd::decode_all), p))
101 }
102}
103
104pub mod Lazy {
105 use super::*;
106 pub fn File(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
107 lazy_read(p, read_file)
108 }
109 pub fn Text(p: impl Into<Astr>) -> impl Stream<Item = String> {
110 lazy_read(p, read_text)
111 }
112 pub fn Archive(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
113 let p = p.into();
114 lazy_read(p.clone(), read_file).map(move |data| data.pipe_as(zstd::decode_all).explain_err(|| format!("Cannot decode archive {p:?}")).warn())
115 }
116}
117
118pub mod Watch {
119 use super::*;
120 pub fn File(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
121 watch_file(p, read_file)
122 }
123 pub fn Text(p: impl Into<Astr>) -> impl Stream<Item = String> {
124 watch_file(p, read_text)
125 }
126 pub fn Archive(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
127 let p = p.into();
128 watch_file(p.clone(), read_file).map(move |data| data.pipe_as(zstd::decode_all).explain_err(|| format!("Cannot decode archive {p:?}")).warn())
129 }
130}
131
132fn lazy_read<T: Default>(p: impl Into<Astr>, loader: impl AsyncFnOnce(APath) -> Res<T>) -> impl Stream<Item = T> {
133 stream::once(async {
134 let p = p.into().pipe_as(PathBuf::from).into();
135 loader(p).await.warn()
136 })
137}
138
139fn watch_file<T>(p: impl Into<Astr>, loader: impl AsyncFnOnce(APath) -> Res<T> + Clone) -> impl Stream<Item = T> {
140 let rx = Notify::new().pipe(Arc);
141 let p: APath = p.into().pipe_as(PathBuf::from).into();
142
143 stream::unfold(None, move |w| {
144 let (p, loader, _sn, rx) = (p.clone(), loader.clone(), rx.clone().pipe(Some), rx.clone());
145 async move {
146 let first = w.is_none();
147 if let Some(_w) = w {
148 let _ = rx.notified().await;
149 DEBUG!("File {p:?} changed");
150 }
151
152 let t = fs::metadata(&p).await.and_then(|m| m.modified());
153 let mut _sn = _sn.map(|s| (s, t));
154
155 while !first && !p.exists() {
156 task::sleep_ms(100).await;
157 }
158
159 #[allow(clippy::redundant_closure)]
160 let file = p.clone().pipe(|p| loader(p)).await.map_err(|e| FAIL!(e));
161 DEBUG!("File {p:?} loaded");
162
163 let w = {
164 #[cfg(feature = "fsnotify")]
165 {
166 while !first && !p.exists() {
167 task::sleep_ms(100).await;
168 }
169 use notify::*;
170
171 recommended_watcher({
172 let p = p.clone();
173 move |r| match r {
174 Ok(_) => {
175 match (_sn.as_ref(), std::fs::metadata(&p).and_then(|m| m.modified())) {
176 (Some((_, Ok(mtime))), Ok(t)) if &t == mtime => return,
177 _ => (),
178 }
179 _sn.take().map(|(s, _)| s.notify_one()).sink()
180 }
181 Err(e) => FAIL!("File {p:?}: {e}"),
182 }
183 })
184 .map_err(|e| FAIL!("Watch {p:?}: {e}"))
185 .ok()
186 .map(|mut w| {
187 w.watch(&p, RecursiveMode::NonRecursive).unwrap_or_else(|_| FAIL!("Cannot watch {p:?}"));
188 w
189 })
190 .pipe(Some)
191 }
192 #[cfg(not(feature = "fsnotify"))]
193 {
194 WARN!("Enable fsnotify feature to watch files");
195 Some(())
196 }
197 };
198
199 file.ok().map(|f| (f, w))
200 }
201 })
202}
203
204async fn read_file(p: APath) -> Res<Vec<u8>> {
205 async fn read(p: &Path) -> Res<Vec<u8>> {
206 let (mut f, mut b) = (fs::File::open(p).await.res()?, vec![]);
207 f.read_to_end(&mut b).await.res()?;
208 Ok(b)
209 }
210 fmt_err(read(&p).await, &p)
211}
212async fn read_text(p: APath) -> Res<String> {
213 async fn read(p: &Path) -> Res<String> {
214 let (mut f, mut b) = (fs::File::open(p).await.res()?, Def());
215 f.read_to_string(&mut b).await.res()?;
216 Ok(b)
217 }
218 fmt_err(read(&p).await, &p)
219}
220
221fn fmt_err<T>(r: Result<T, impl Display>, p: &Path) -> Res<T> {
222 r.explain_err(|| format!("Cannot open file {p:?}"))
223}
224
225type APath = Arc<Path>;
226
227#[cfg(feature = "zstd")]
228mod zstd {
229 pub use zstd::stream::*;
230}
231#[cfg(not(feature = "zstd"))]
232mod zstd {
233 use super::*;
234 pub fn encode_all(s: &[u8], _: i32) -> Res<Vec<u8>> {
235 s.to_vec().pipe(Ok)
236 }
237 pub fn decode_all<R: std::io::Read>(mut s: R) -> Result<Vec<u8>, io::Error> {
238 let mut b = vec![];
239 s.read_to_end(&mut b)?;
240 Ok(b)
241 }
242}