object_rainbow_fetchall/
lib.rs1use std::{
2 collections::{BTreeMap, btree_map},
3 pin::Pin,
4};
5
6use async_executor::Executor;
7use flume::Sender;
8use futures_channel::oneshot;
9use object_rainbow::{Hash, ObjectHashes, PointVisitor, SingularFetch, ToOutput, Traversible};
10use object_rainbow_local_map::LocalMap;
11
12type Dependency = Box<
13 dyn 'static
14 + Send
15 + FnOnce(Context<'_>) -> Pin<Box<dyn '_ + Send + Future<Output = object_rainbow::Result<()>>>>,
16>;
17
18enum Request {
19 Depencencies {
20 dependencies: BTreeMap<Hash, Dependency>,
21 callback: oneshot::Sender<object_rainbow::Result<()>>,
22 },
23 End {
24 hash: Hash,
25 tags_hash: Hash,
26 mangle_hash: Hash,
27 topology: Vec<Hash>,
28 data: Vec<u8>,
29 callback: oneshot::Sender<object_rainbow::Result<()>>,
30 },
31}
32
33struct Context<'r> {
34 request: &'r Sender<Request>,
35}
36
37struct DependencyVisitor<'v> {
38 dependencies: &'v mut BTreeMap<Hash, Dependency>,
39 topology: &'v mut Vec<Hash>,
40}
41
42impl<'v> PointVisitor for DependencyVisitor<'v> {
43 fn visit<T: Traversible>(&mut self, point: &(impl 'static + SingularFetch<T = T> + Clone)) {
44 if let btree_map::Entry::Vacant(e) = self.dependencies.entry(point.hash()) {
45 let point = point.clone();
46 e.insert(Box::new(move |context| {
47 Box::pin(async move { context.save_object(&point.fetch().await?).await })
48 }));
49 }
50 self.topology.push(point.hash());
51 }
52}
53
54impl<'r> Context<'r> {
55 async fn save_object(&self, object: &impl Traversible) -> object_rainbow::Result<()> {
56 let mut dependencies = BTreeMap::new();
57 let mut topology = Vec::new();
58 object.traverse(&mut DependencyVisitor {
59 dependencies: &mut dependencies,
60 topology: &mut topology,
61 });
62 {
63 let (callback, wait) = oneshot::channel();
64 self.request
65 .send_async(Request::Depencencies {
66 dependencies,
67 callback,
68 })
69 .await
70 .ok();
71 let Ok(r) = wait.await else {
72 return Err(object_rainbow::Error::Interrupted);
73 };
74 r?;
75 }
76 {
77 let (callback, wait) = oneshot::channel();
78 let diff = object.diff_hashes();
79 let diff_hash = diff.data_hash();
80 let data = object.vec();
81 let data_hash = data.data_hash();
82 let hashes = ObjectHashes {
83 diff: diff_hash,
84 data: data_hash,
85 };
86 let full_hash = hashes.data_hash();
87 self.request
88 .send_async(Request::End {
89 hash: full_hash,
90 tags_hash: diff.tags,
91 mangle_hash: diff.mangle,
92 topology,
93 data,
94 callback,
95 })
96 .await
97 .ok();
98 let Ok(r) = wait.await else {
99 return Err(object_rainbow::Error::Interrupted);
100 };
101 r?;
102 }
103 Ok(())
104 }
105}
106
107pub async fn fetchall(object: &impl Traversible) -> object_rainbow::Result<LocalMap> {
108 let mut map = LocalMap::new();
109 {
110 let mut started = BTreeMap::new();
111 let (send, recv) = flume::bounded(0);
112 let outer = Executor::new();
113 let inner = Executor::new();
114 let task =
115 inner.spawn(async {
116 while let Ok(request) = recv.recv_async().await {
117 match request {
118 Request::Depencencies {
119 dependencies,
120 callback,
121 } => {
122 let mut tasks = Vec::new();
123 for (hash, save) in dependencies {
124 let recv = match started.entry(hash) {
125 btree_map::Entry::Vacant(e) => {
126 let future = save(Context { request: &send });
127 let (send, recv) = flume::bounded(1);
128 let task = outer.spawn(async move {
129 send.send_async(future.await).await.ok();
130 });
131 e.insert((recv, task)).0.clone()
132 }
133 btree_map::Entry::Occupied(e) => e.get().0.clone(),
134 };
135 tasks.push(outer.spawn(async move {
136 recv.recv_async().await.unwrap_or(Ok(()))
137 }));
138 }
139 outer
140 .spawn(async move {
141 for task in tasks {
142 if let Err(e) = task.await {
143 callback.send(Err(e)).ok();
144 return;
145 }
146 }
147 callback.send(Ok(())).ok();
148 })
149 .detach();
150 }
151 Request::End {
152 hash,
153 tags_hash,
154 mangle_hash,
155 topology,
156 data,
157 callback,
158 } => {
159 assert!(!map.contains(hash));
160 callback
161 .send(map.insert(hash, tags_hash, mangle_hash, topology, data))
162 .ok();
163 }
164 }
165 }
166 });
167 let _task = inner.spawn(outer.run(task));
168 inner
169 .run(Context { request: &send }.save_object(object))
170 .await?;
171 }
172 assert!(map.contains(object.full_hash()));
173 Ok(map)
174}