grafix_toolbox/kit/policies/func/
file.rs

1use crate::{asyn::*, lib::*};
2use std::path::{Path, PathBuf};
3
4pub mod Save {
5	use super::*;
6
7	pub fn Write(p: impl Into<Astr>, data: impl Into<Arc<[u8]>>) {
8		sender().send((p.into(), MessageType::Write, data.into())).expect(FAILED_WRITE);
9	}
10	pub fn Append(p: impl Into<Astr>, data: impl Into<Arc<[u8]>>) {
11		sender().send((p.into(), MessageType::Append, data.into())).expect(FAILED_WRITE);
12	}
13	pub fn Archive(args: impl CompressArgs) {
14		let (p, data, level) = args.get();
15		sender().send((p, MessageType::ComprW(level), data)).expect(FAILED_WRITE);
16	}
17
18	type Args = (Astr, Arc<[u8]>, i32);
19	pub trait CompressArgs {
20		fn get(self) -> Args;
21	}
22	impl<T: Into<Arc<[u8]>>, F: Into<Astr>, C> CompressArgs for (F, T, C)
23	where
24		i32: Cast<C>,
25	{
26		fn get(self) -> Args {
27			(self.0.into(), self.1.into(), i32(self.2).clamp(1, 22))
28		}
29	}
30	impl<T: Into<Arc<[u8]>>, F: Into<Astr>> CompressArgs for (F, T) {
31		fn get(self) -> Args {
32			(self.0.into(), self.1.into(), 1)
33		}
34	}
35
36	enum MessageType {
37		Write,
38		Append,
39		ComprW(i32),
40		Close,
41	}
42	type Message = (Astr, MessageType, Arc<[u8]>);
43	fn sender() -> &'static Sender<Message> {
44		use MessageType::*;
45		static SENDER: OnceLock<Sender<Message>> = OnceLock::new();
46		SENDER.get_or_init(|| {
47			let (sn, mut rx) = chan::unbounded::<Message>();
48			let writer = task::Runtime().spawn(async move {
49				while let Some(msg) = rx.recv().await {
50					let (name, operation, data) = msg;
51					let file = match operation {
52						Close => break,
53						Write | ComprW(_) => fs::File::create(&*name).await,
54						Append => fs::OpenOptions::new().append(true).create(true).open(&*name).await,
55					};
56
57					if let Ok(mut file) = file {
58						let data = if let ComprW(l) = operation {
59							task::spawn_blocking(move || zstd::stream::encode_all(&data[..], l))
60								.await
61								.fail()
62								.explain_err(|| format!("Cannot encode data for file {name:?}"))
63								.warn()
64								.into()
65						} else {
66							data
67						};
68
69						file.write_all(&data).await.explain_err(|| format!("Cannot write {name:?}")).warn();
70						file.sync_all().await.explain_err(|| format!("Cannot sync {name:?}")).warn();
71					} else {
72						let name: PathBuf = (*name).into();
73						FAIL!("{:?}", fmt_err(file, &name));
74					}
75				}
76			});
77
78			logging::Logger::shutdown_hook({
79				let sn = sn.clone();
80				move || {
81					sn.send(("".into(), Close, vec![].into())).expect("E| Cannot close async write system");
82					task::Runtime().finish(writer);
83				}
84			});
85
86			sn
87		})
88	}
89}
90
91pub mod Load {
92	use super::*;
93	pub fn File(p: impl AsRef<Path>) -> Res<Vec<u8>> {
94		let p = p.as_ref();
95		fmt_err(std::fs::read(p), p)
96	}
97	pub fn Text(p: impl AsRef<Path>) -> Res<String> {
98		let p = p.as_ref();
99		fmt_err(std::fs::read_to_string(p), p)
100	}
101	pub fn Archive(p: impl AsRef<Path>) -> Res<Vec<u8>> {
102		let decode = |p| -> Res<_> {
103			let f = Res(std::fs::File::open(p))?;
104			let b = Res(zstd::stream::decode_all(f))?;
105			Ok(b)
106		};
107		let p = p.as_ref();
108		fmt_err(decode(p), p)
109	}
110}
111
112pub mod Lazy {
113	use super::*;
114	pub fn File(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
115		lazy_read(p, read_file)
116	}
117	pub fn Text(p: impl Into<Astr>) -> impl Stream<Item = String> {
118		lazy_read(p, read_text)
119	}
120	pub fn Archive(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
121		let p = p.into();
122		lazy_read(p.clone(), read_file).map(move |data| zstd::stream::decode_all(&data[..]).explain_err(|| format!("Cannot decode archive {p:?}")).warn())
123	}
124}
125
126pub mod Watch {
127	use super::*;
128	pub fn File(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
129		watch_file(p, read_file)
130	}
131	pub fn Text(p: impl Into<Astr>) -> impl Stream<Item = String> {
132		watch_file(p, read_text)
133	}
134	pub fn Archive(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
135		let p = p.into();
136		watch_file(p.clone(), read_file).map(move |data| zstd::stream::decode_all(&data[..]).explain_err(|| format!("Cannot decode archive {p:?}")).warn())
137	}
138}
139
140fn lazy_read<T: Default, F: Future<Output = Res<T>>>(p: impl Into<Astr>, loader: impl FnOnce(Arc<Path>) -> F) -> impl Stream<Item = T> {
141	stream::once_future(async move {
142		let p = PathBuf::from(&*p.into()).into();
143		match loader(p).await {
144			Ok(file) => file,
145			e @ Err(_) => e.warn(),
146		}
147	})
148}
149
150fn watch_file<T, F: Future<Output = Res<T>>>(p: impl Into<Astr>, loader: impl FnOnce(Arc<Path>) -> F + Clone) -> impl Stream<Item = T> {
151	let rx = Arc::new(Notify::new());
152	let p: Arc<Path> = PathBuf::from(&*p.into()).into();
153
154	stream::unfold(None, move |w| {
155		let (p, l, _sn, rx) = (p.clone(), loader.clone(), Some(rx.clone()), rx.clone());
156		async move {
157			let first = w.is_none();
158			if let Some(_w) = w {
159				let _ = rx.notified().await;
160				DEBUG!("File {p:?} changed");
161			}
162
163			let t = fs::metadata(p.clone()).await.and_then(|m| m.modified());
164			let mut _sn = _sn.map(|s| (s, t));
165
166			while !first && !p.exists() {
167				task::sleep(time::Duration::from_millis(100)).await;
168			}
169
170			let file = l(p.clone()).await.map_err(|e| FAIL!(e));
171			DEBUG!("File {p:?} loaded");
172
173			let w = {
174				#[cfg(feature = "fsnotify")]
175				{
176					use notify::*;
177					let p = p.clone();
178					let mut w = {
179						let p = p.clone();
180						Res(recommended_watcher(move |r| match r {
181							Ok(_) => {
182								match (_sn.as_ref(), std::fs::metadata(p.clone()).and_then(|m| m.modified())) {
183									(Some((_, Ok(mtime))), Ok(t)) if &t == mtime => return (),
184									_ => (),
185								}
186								_sn.take().map(|(s, _)| s.notify_one()).sink()
187							}
188							Err(e) => FAIL!("File {p:?}: {e}"),
189						}))
190					}
191					.map_err(|e| FAIL!("Watch {p:?}: {e}"))
192					.ok();
193
194					while !first && !p.exists() {
195						task::sleep(time::Duration::from_millis(100)).await;
196					}
197					w.as_mut().map(|w| w.watch(&p, RecursiveMode::NonRecursive).unwrap_or_else(|_| FAIL!("Cannot watch {p:?}")));
198					Some(w)
199				}
200				#[cfg(not(feature = "fsnotify"))]
201				{
202					FAIL!("Enable fsnotify feature to watch files");
203					Some(())
204				}
205			};
206
207			file.ok().map(|f| (f, w))
208		}
209	})
210}
211
212async fn read_file(p: Arc<Path>) -> Res<Vec<u8>> {
213	async fn read(p: &Path) -> Res<Vec<u8>> {
214		let (mut f, mut b) = (Res(fs::File::open(p).await)?, vec![]);
215		Res(f.read_to_end(&mut b).await)?;
216		Ok(b)
217	}
218	fmt_err(read(&p).await, &p)
219}
220async fn read_text(p: Arc<Path>) -> Res<String> {
221	async fn read(p: &Path) -> Res<String> {
222		let (mut f, mut b) = (Res(fs::File::open(p).await)?, String::new());
223		Res(f.read_to_string(&mut b).await)?;
224		Ok(b)
225	}
226	fmt_err(read(&p).await, &p)
227}
228
229fn fmt_err<T>(r: Result<T, impl std::fmt::Display>, p: &Path) -> Res<T> {
230	r.explain_err(|| format!("Cannot open file {p:?}"))
231}
232
233const FAILED_WRITE: STR = "E| Cannot send write";
234
235#[cfg(not(feature = "zstd"))]
236mod zstd {
237	pub mod stream {
238		use super::super::*;
239		pub fn encode_all(s: &[u8], _: i32) -> Res<Vec<u8>> {
240			Ok(s.to_vec())
241		}
242		pub fn decode_all<R: std::io::Read>(mut s: R) -> Res<Vec<u8>> {
243			let mut b = vec![];
244			Res(s.read_to_end(&mut b))?;
245			Ok(b)
246		}
247	}
248}