use crate::filesystem::*;
use melodium_core::*;
use melodium_macro::mel_treatment;
use std::sync::Arc;
#[mel_treatment(
input path Block<string>
input filesystem Block<FileSystem>
output data Stream<byte>
output reached Block<void>
output completed Block<void>
output failed Block<void>
output finished Block<void>
output errors Stream<string>
)]
pub async fn read() {
if let (Ok(filesystem), Ok(path)) = (
filesystem.recv_one().await.map(|val| {
GetData::<Arc<dyn Data>>::try_data(val)
.unwrap()
.downcast_arc::<FileSystem>()
.unwrap()
}),
path.recv_one()
.await
.map(|val| GetData::<string>::try_data(val).unwrap()),
) {
filesystem
.filesystem
.read_file(
&path,
Box::new(|content: VecDeque<u8>| {
Box::pin(async {
data.send_many(TransmissionValue::Byte(content))
.await
.map_err(|_| ())
})
}),
Box::new(|| {
Box::pin(async {
let _ = reached.send_one(().into()).await;
})
}),
Box::new(|| {
Box::pin(async {
reached.close().await;
})
}),
Box::new(|| {
Box::pin(async {
let _ = completed.send_one(().into()).await;
})
}),
Box::new(|| {
Box::pin(async {
let _ = failed.send_one(().into()).await;
})
}),
Box::new(|| {
Box::pin(async {
let _ = finished.send_one(().into()).await;
})
}),
Box::new(|msg: String| {
Box::pin(async { errors.send_one(msg.into()).await.map_err(|_| ()) })
}),
)
.await
}
}
#[mel_treatment(
default append false
default create true
default new false
input path Block<string>
input filesystem Block<FileSystem>
input data Stream<byte>
output completed Block<void>
output failed Block<void>
output finished Block<void>
output errors Stream<string>
output amount Stream<u128>
)]
pub async fn write(append: bool, create: bool, new: bool) {
if let (Ok(filesystem), Ok(path)) = (
filesystem.recv_one().await.map(|val| {
GetData::<Arc<dyn Data>>::try_data(val)
.unwrap()
.downcast_arc::<FileSystem>()
.unwrap()
}),
path.recv_one()
.await
.map(|val| GetData::<string>::try_data(val).unwrap()),
) {
filesystem
.filesystem
.write_file(
&path,
append,
create,
new,
Box::new(|| {
Box::pin(async {
data.recv_many()
.await
.map(|values| TryInto::<Vec<u8>>::try_into(values).unwrap())
.map_err(|_| ())
})
}),
Box::new(|amt: u128| {
Box::pin({
let amount = &amount;
async move { amount.send_one(amt.into()).await.map_err(|_| ()) }
})
}),
Box::new(|| {
Box::pin(async {
let _ = completed.send_one(().into()).await;
})
}),
Box::new(|| {
Box::pin(async {
let _ = failed.send_one(().into()).await;
})
}),
Box::new(|| {
Box::pin(async {
let _ = finished.send_one(().into()).await;
})
}),
Box::new(|msg: String| {
Box::pin(async {
let _ = errors.send_one(msg.into()).await.map_err(|_| ());
errors.force_send().await;
Ok(())
})
}),
)
.await
}
}