grafix_toolbox/kit/policies/func/
file.rs1use crate::{asyn::*, lib::*};
2use std::path::{Path, PathBuf};
3
4pub mod Save {
5 use super::*;
6
7 pub fn Write(p: impl Into<Astr>, data: impl Into<Arc<[u8]>>) {
8 sender().send((p.into(), MessageType::Write, data.into())).expect(FAILED_WRITE);
9 }
10 pub fn Append(p: impl Into<Astr>, data: impl Into<Arc<[u8]>>) {
11 sender().send((p.into(), MessageType::Append, data.into())).expect(FAILED_WRITE);
12 }
13 pub fn Archive(args: impl CompressArgs) {
14 let (p, data, level) = args.get();
15 sender().send((p, MessageType::ComprW(level), data)).expect(FAILED_WRITE);
16 }
17
18 type Args = (Astr, Arc<[u8]>, i32);
19 pub trait CompressArgs {
20 fn get(self) -> Args;
21 }
22 impl<T: Into<Arc<[u8]>>, F: Into<Astr>, C> CompressArgs for (F, T, C)
23 where
24 i32: Cast<C>,
25 {
26 fn get(self) -> Args {
27 (self.0.into(), self.1.into(), i32(self.2).clamp(1, 22))
28 }
29 }
30 impl<T: Into<Arc<[u8]>>, F: Into<Astr>> CompressArgs for (F, T) {
31 fn get(self) -> Args {
32 (self.0.into(), self.1.into(), 1)
33 }
34 }
35
36 enum MessageType {
37 Write,
38 Append,
39 ComprW(i32),
40 Close,
41 }
42 type Message = (Astr, MessageType, Arc<[u8]>);
43 fn sender() -> &'static Sender<Message> {
44 use MessageType::*;
45 static SENDER: OnceLock<Sender<Message>> = OnceLock::new();
46 SENDER.get_or_init(|| {
47 let (sn, mut rx) = chan::unbounded::<Message>();
48 let writer = task::Runtime().spawn(async move {
49 while let Some(msg) = rx.recv().await {
50 let (name, operation, data) = msg;
51 let file = match operation {
52 Close => break,
53 Write | ComprW(_) => fs::File::create(&*name).await,
54 Append => fs::OpenOptions::new().append(true).create(true).open(&*name).await,
55 };
56
57 if let Ok(mut file) = file {
58 let data = if let ComprW(l) = operation {
59 task::spawn_blocking(move || zstd::stream::encode_all(&data[..], l))
60 .await
61 .fail()
62 .explain_err(|| format!("Cannot encode data for file {name:?}"))
63 .warn()
64 .into()
65 } else {
66 data
67 };
68
69 file.write_all(&data).await.explain_err(|| format!("Cannot write {name:?}")).warn();
70 file.sync_all().await.explain_err(|| format!("Cannot sync {name:?}")).warn();
71 } else {
72 let name: PathBuf = (*name).into();
73 FAIL!("{:?}", fmt_err(file, &name));
74 }
75 }
76 });
77
78 logging::Logger::shutdown_hook({
79 let sn = sn.clone();
80 move || {
81 sn.send(("".into(), Close, vec![].into())).expect("E| Cannot close async write system");
82 task::Runtime().finish(writer);
83 }
84 });
85
86 sn
87 })
88 }
89}
90
91pub mod Load {
92 use super::*;
93 pub fn File(p: impl AsRef<Path>) -> Res<Vec<u8>> {
94 let p = p.as_ref();
95 fmt_err(std::fs::read(p), p)
96 }
97 pub fn Text(p: impl AsRef<Path>) -> Res<String> {
98 let p = p.as_ref();
99 fmt_err(std::fs::read_to_string(p), p)
100 }
101 pub fn Archive(p: impl AsRef<Path>) -> Res<Vec<u8>> {
102 let decode = |p| -> Res<_> {
103 let f = Res(std::fs::File::open(p))?;
104 let b = Res(zstd::stream::decode_all(f))?;
105 Ok(b)
106 };
107 let p = p.as_ref();
108 fmt_err(decode(p), p)
109 }
110}
111
112pub mod Lazy {
113 use super::*;
114 pub fn File(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
115 lazy_read(p, read_file)
116 }
117 pub fn Text(p: impl Into<Astr>) -> impl Stream<Item = String> {
118 lazy_read(p, read_text)
119 }
120 pub fn Archive(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
121 let p = p.into();
122 lazy_read(p.clone(), read_file).map(move |data| zstd::stream::decode_all(&data[..]).explain_err(|| format!("Cannot decode archive {p:?}")).warn())
123 }
124}
125
126pub mod Watch {
127 use super::*;
128 pub fn File(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
129 watch_file(p, read_file)
130 }
131 pub fn Text(p: impl Into<Astr>) -> impl Stream<Item = String> {
132 watch_file(p, read_text)
133 }
134 pub fn Archive(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
135 let p = p.into();
136 watch_file(p.clone(), read_file).map(move |data| zstd::stream::decode_all(&data[..]).explain_err(|| format!("Cannot decode archive {p:?}")).warn())
137 }
138}
139
140fn lazy_read<T: Default, F: Future<Output = Res<T>>>(p: impl Into<Astr>, loader: impl FnOnce(Arc<Path>) -> F) -> impl Stream<Item = T> {
141 stream::once_future(async move {
142 let p = PathBuf::from(&*p.into()).into();
143 match loader(p).await {
144 Ok(file) => file,
145 e @ Err(_) => e.warn(),
146 }
147 })
148}
149
150fn watch_file<T, F: Future<Output = Res<T>>>(p: impl Into<Astr>, loader: impl FnOnce(Arc<Path>) -> F + Clone) -> impl Stream<Item = T> {
151 let rx = Arc::new(Notify::new());
152 let p: Arc<Path> = PathBuf::from(&*p.into()).into();
153
154 stream::unfold(None, move |w| {
155 let (p, l, _sn, rx) = (p.clone(), loader.clone(), Some(rx.clone()), rx.clone());
156 async move {
157 let first = w.is_none();
158 if let Some(_w) = w {
159 let _ = rx.notified().await;
160 DEBUG!("File {p:?} changed");
161 }
162
163 let t = fs::metadata(p.clone()).await.and_then(|m| m.modified());
164 let mut _sn = _sn.map(|s| (s, t));
165
166 while !first && !p.exists() {
167 task::sleep(time::Duration::from_millis(100)).await;
168 }
169
170 let file = l(p.clone()).await.map_err(|e| FAIL!(e));
171 DEBUG!("File {p:?} loaded");
172
173 let w = {
174 #[cfg(feature = "fsnotify")]
175 {
176 use notify::*;
177 let p = p.clone();
178 let mut w = {
179 let p = p.clone();
180 Res(recommended_watcher(move |r| match r {
181 Ok(_) => {
182 match (_sn.as_ref(), std::fs::metadata(p.clone()).and_then(|m| m.modified())) {
183 (Some((_, Ok(mtime))), Ok(t)) if &t == mtime => return (),
184 _ => (),
185 }
186 _sn.take().map(|(s, _)| s.notify_one()).sink()
187 }
188 Err(e) => FAIL!("File {p:?}: {e}"),
189 }))
190 }
191 .map_err(|e| FAIL!("Watch {p:?}: {e}"))
192 .ok();
193
194 while !first && !p.exists() {
195 task::sleep(time::Duration::from_millis(100)).await;
196 }
197 w.as_mut().map(|w| w.watch(&p, RecursiveMode::NonRecursive).unwrap_or_else(|_| FAIL!("Cannot watch {p:?}")));
198 Some(w)
199 }
200 #[cfg(not(feature = "fsnotify"))]
201 {
202 FAIL!("Enable fsnotify feature to watch files");
203 Some(())
204 }
205 };
206
207 file.ok().map(|f| (f, w))
208 }
209 })
210}
211
212async fn read_file(p: Arc<Path>) -> Res<Vec<u8>> {
213 async fn read(p: &Path) -> Res<Vec<u8>> {
214 let (mut f, mut b) = (Res(fs::File::open(p).await)?, vec![]);
215 Res(f.read_to_end(&mut b).await)?;
216 Ok(b)
217 }
218 fmt_err(read(&p).await, &p)
219}
220async fn read_text(p: Arc<Path>) -> Res<String> {
221 async fn read(p: &Path) -> Res<String> {
222 let (mut f, mut b) = (Res(fs::File::open(p).await)?, String::new());
223 Res(f.read_to_string(&mut b).await)?;
224 Ok(b)
225 }
226 fmt_err(read(&p).await, &p)
227}
228
229fn fmt_err<T>(r: Result<T, impl std::fmt::Display>, p: &Path) -> Res<T> {
230 r.explain_err(|| format!("Cannot open file {p:?}"))
231}
232
233const FAILED_WRITE: STR = "E| Cannot send write";
234
235#[cfg(not(feature = "zstd"))]
236mod zstd {
237 pub mod stream {
238 use super::super::*;
239 pub fn encode_all(s: &[u8], _: i32) -> Res<Vec<u8>> {
240 Ok(s.to_vec())
241 }
242 pub fn decode_all<R: std::io::Read>(mut s: R) -> Res<Vec<u8>> {
243 let mut b = vec![];
244 Res(s.read_to_end(&mut b))?;
245 Ok(b)
246 }
247 }
248}