Skip to main content

grafix_toolbox/kit/policies/func/
file.rs

1use crate::{asyn::*, lib::*};
2use std::path::{Path, PathBuf};
3
4pub mod Save {
5	use super::*;
6
7	pub fn Write(p: impl Into<Astr>, data: impl Into<Arc<[u8]>>) {
8		sender().send((p.into(), MessageType::Write, data.into())).valid();
9	}
10	pub fn Append(p: impl Into<Astr>, data: impl Into<Arc<[u8]>>) {
11		sender().send((p.into(), MessageType::Append, data.into())).valid();
12	}
13	pub fn Archive(args: impl CompressArgs) {
14		let (p, data, level) = args.get();
15		sender().send((p, MessageType::ComprW(level), data)).valid();
16	}
17
18	type Args = (Astr, Arc<[u8]>, i32);
19	pub trait CompressArgs {
20		fn get(self) -> Args;
21	}
22	impl<T: Into<Arc<[u8]>>, F: Into<Astr>, C> CompressArgs for (F, T, C)
23	where
24		i32: Cast<C>,
25	{
26		fn get(self) -> Args {
27			(self.0.into(), self.1.into(), i32(self.2).clamp(1, 22))
28		}
29	}
30	impl<T: Into<Arc<[u8]>>, F: Into<Astr>> CompressArgs for (F, T) {
31		fn get(self) -> Args {
32			(self.0.into(), self.1.into(), 1)
33		}
34	}
35
36	enum MessageType {
37		Write,
38		Append,
39		ComprW(i32),
40		Close,
41	}
42	type Message = (Astr, MessageType, Arc<[u8]>);
43	fn sender() -> &'static Sender<Message> {
44		use MessageType::*;
45		static SENDER: OnceLock<Sender<Message>> = OnceLock::new();
46		SENDER.get_or_init(|| {
47			let (sn, mut rx) = chan::unbounded::<Message>();
48			let writer = task::Runtime().spawn(async move || {
49				while let Some(msg) = rx.recv().await {
50					let (name, operation, data) = msg;
51					let file = match operation {
52						Close => break,
53						Write | ComprW(_) => fs::File::create(&*name).await,
54						Append => fs::OpenOptions::new().append(true).create(true).open(&*name).await,
55					};
56
57					let Ok(mut file) = file else {
58						let name: PathBuf = (*name).into();
59						FAIL!({ continue }, "{:?}", fmt_err(file, &name));
60					};
61
62					let data = if let ComprW(l) = operation {
63						task::spawn_blocking(move || zstd::encode_all(&data[..], l))
64							.await
65							.fail()
66							.explain_err(|| format!("Cannot encode file {name:?}"))
67							.warn()
68							.into()
69					} else {
70						data
71					};
72
73					file.write_all(&data).await.explain_err(|| format!("Cannot write {name:?}")).warn();
74					file.sync_all().await.explain_err(|| format!("Cannot sync {name:?}")).warn();
75				}
76			});
77
78			logger::Logger::shutdown_hook({
79				let sn = sn.clone();
80				move || {
81					sn.send(("".into(), Close, Def())).expect("E| Cannot close async write system");
82					task::Runtime().finish(writer);
83				}
84			});
85
86			sn
87		})
88	}
89}
90
91pub mod Load {
92	use {super::*, std::fs};
93	pub fn File(p: impl AsRef<Path>) -> Res<Vec<u8>> {
94		p.as_ref().pipe(|p| fmt_err(fs::read(p), p))
95	}
96	pub fn Text(p: impl AsRef<Path>) -> Res<String> {
97		p.as_ref().pipe(|p| fmt_err(fs::read_to_string(p), p))
98	}
99	pub fn Archive(p: impl AsRef<Path>) -> Res<Vec<u8>> {
100		p.as_ref().pipe(|p| fmt_err(fs::File::open(p).and_then(zstd::decode_all), p))
101	}
102}
103
104pub mod Lazy {
105	use super::*;
106	pub fn File(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
107		lazy_read(p, read_file)
108	}
109	pub fn Text(p: impl Into<Astr>) -> impl Stream<Item = String> {
110		lazy_read(p, read_text)
111	}
112	pub fn Archive(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
113		let p = p.into();
114		lazy_read(p.clone(), read_file).map(move |data| data.pipe_as(zstd::decode_all).explain_err(|| format!("Cannot decode archive {p:?}")).warn())
115	}
116}
117
118pub mod Watch {
119	use super::*;
120	pub fn File(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
121		watch_file(p, read_file)
122	}
123	pub fn Text(p: impl Into<Astr>) -> impl Stream<Item = String> {
124		watch_file(p, read_text)
125	}
126	pub fn Archive(p: impl Into<Astr>) -> impl Stream<Item = Vec<u8>> {
127		let p = p.into();
128		watch_file(p.clone(), read_file).map(move |data| data.pipe_as(zstd::decode_all).explain_err(|| format!("Cannot decode archive {p:?}")).warn())
129	}
130}
131
132fn lazy_read<T: Default>(p: impl Into<Astr>, loader: impl AsyncFnOnce(APath) -> Res<T>) -> impl Stream<Item = T> {
133	stream::once(async {
134		let p = p.into().pipe_as(PathBuf::from).into();
135		loader(p).await.warn()
136	})
137}
138
139fn watch_file<T>(p: impl Into<Astr>, loader: impl AsyncFnOnce(APath) -> Res<T> + Clone) -> impl Stream<Item = T> {
140	let rx = Notify::new().pipe(Arc);
141	let p: APath = p.into().pipe_as(PathBuf::from).into();
142
143	stream::unfold(None, move |w| {
144		let (p, loader, _sn, rx) = (p.clone(), loader.clone(), rx.clone().pipe(Some), rx.clone());
145		async move {
146			let first = w.is_none();
147			if let Some(_w) = w {
148				let _ = rx.notified().await;
149				DEBUG!("File {p:?} changed");
150			}
151
152			let t = fs::metadata(&p).await.and_then(|m| m.modified());
153			let mut _sn = _sn.map(|s| (s, t));
154
155			while !first && !p.exists() {
156				task::sleep_ms(100).await;
157			}
158
159			#[allow(clippy::redundant_closure)]
160			let file = p.clone().pipe(|p| loader(p)).await.map_err(|e| FAIL!(e));
161			DEBUG!("File {p:?} loaded");
162
163			let w = {
164				#[cfg(feature = "fsnotify")]
165				{
166					while !first && !p.exists() {
167						task::sleep_ms(100).await;
168					}
169					use notify::*;
170
171					recommended_watcher({
172						let p = p.clone();
173						move |r| match r {
174							Ok(_) => {
175								match (_sn.as_ref(), std::fs::metadata(&p).and_then(|m| m.modified())) {
176									(Some((_, Ok(mtime))), Ok(t)) if &t == mtime => return,
177									_ => (),
178								}
179								_sn.take().map(|(s, _)| s.notify_one()).sink()
180							}
181							Err(e) => FAIL!("File {p:?}: {e}"),
182						}
183					})
184					.map_err(|e| FAIL!("Watch {p:?}: {e}"))
185					.ok()
186					.map(|mut w| {
187						w.watch(&p, RecursiveMode::NonRecursive).unwrap_or_else(|_| FAIL!("Cannot watch {p:?}"));
188						w
189					})
190					.pipe(Some)
191				}
192				#[cfg(not(feature = "fsnotify"))]
193				{
194					WARN!("Enable fsnotify feature to watch files");
195					Some(())
196				}
197			};
198
199			file.ok().map(|f| (f, w))
200		}
201	})
202}
203
204async fn read_file(p: APath) -> Res<Vec<u8>> {
205	async fn read(p: &Path) -> Res<Vec<u8>> {
206		let (mut f, mut b) = (fs::File::open(p).await.res()?, vec![]);
207		f.read_to_end(&mut b).await.res()?;
208		Ok(b)
209	}
210	fmt_err(read(&p).await, &p)
211}
212async fn read_text(p: APath) -> Res<String> {
213	async fn read(p: &Path) -> Res<String> {
214		let (mut f, mut b) = (fs::File::open(p).await.res()?, Def());
215		f.read_to_string(&mut b).await.res()?;
216		Ok(b)
217	}
218	fmt_err(read(&p).await, &p)
219}
220
221fn fmt_err<T>(r: Result<T, impl Display>, p: &Path) -> Res<T> {
222	r.explain_err(|| format!("Cannot open file {p:?}"))
223}
224
225type APath = Arc<Path>;
226
227#[cfg(feature = "zstd")]
228mod zstd {
229	pub use zstd::stream::*;
230}
231#[cfg(not(feature = "zstd"))]
232mod zstd {
233	use super::*;
234	pub fn encode_all(s: &[u8], _: i32) -> Res<Vec<u8>> {
235		s.to_vec().pipe(Ok)
236	}
237	pub fn decode_all<R: std::io::Read>(mut s: R) -> Result<Vec<u8>, io::Error> {
238		let mut b = vec![];
239		s.read_to_end(&mut b)?;
240		Ok(b)
241	}
242}