Skip to main content

object_rainbow_fetchall/
lib.rs

1use std::{
2    collections::{BTreeMap, btree_map},
3    pin::Pin,
4};
5
6use async_executor::Executor;
7use flume::Sender;
8use futures_channel::oneshot;
9use object_rainbow::{Hash, ObjectHashes, PointVisitor, SingularFetch, ToOutput, Traversible};
10use object_rainbow_local_map::LocalMap;
11
12type Dependency = Box<
13    dyn 'static
14        + Send
15        + FnOnce(Context<'_>) -> Pin<Box<dyn '_ + Send + Future<Output = object_rainbow::Result<()>>>>,
16>;
17
18enum Request {
19    Depencencies {
20        dependencies: BTreeMap<Hash, Dependency>,
21        callback: oneshot::Sender<object_rainbow::Result<()>>,
22    },
23    End {
24        hash: Hash,
25        tags_hash: Hash,
26        mangle_hash: Hash,
27        topology: Vec<Hash>,
28        data: Vec<u8>,
29        callback: oneshot::Sender<object_rainbow::Result<()>>,
30    },
31}
32
33struct Context<'r> {
34    request: &'r Sender<Request>,
35}
36
37struct DependencyVisitor<'v> {
38    dependencies: &'v mut BTreeMap<Hash, Dependency>,
39    topology: &'v mut Vec<Hash>,
40}
41
42impl<'v> PointVisitor for DependencyVisitor<'v> {
43    fn visit<T: Traversible>(&mut self, point: &(impl 'static + SingularFetch<T = T> + Clone)) {
44        if let btree_map::Entry::Vacant(e) = self.dependencies.entry(point.hash()) {
45            let point = point.clone();
46            e.insert(Box::new(move |context| {
47                Box::pin(async move { context.save_object(&point.fetch().await?).await })
48            }));
49        }
50        self.topology.push(point.hash());
51    }
52}
53
54impl<'r> Context<'r> {
55    async fn save_object(&self, object: &impl Traversible) -> object_rainbow::Result<()> {
56        let mut dependencies = BTreeMap::new();
57        let mut topology = Vec::new();
58        object.traverse(&mut DependencyVisitor {
59            dependencies: &mut dependencies,
60            topology: &mut topology,
61        });
62        {
63            let (callback, wait) = oneshot::channel();
64            self.request
65                .send_async(Request::Depencencies {
66                    dependencies,
67                    callback,
68                })
69                .await
70                .ok();
71            let Ok(r) = wait.await else {
72                return Err(object_rainbow::Error::Interrupted);
73            };
74            r?;
75        }
76        {
77            let (callback, wait) = oneshot::channel();
78            let diff = object.diff_hashes();
79            let diff_hash = diff.data_hash();
80            let data = object.vec();
81            let data_hash = data.data_hash();
82            let hashes = ObjectHashes {
83                diff: diff_hash,
84                data: data_hash,
85            };
86            let full_hash = hashes.data_hash();
87            self.request
88                .send_async(Request::End {
89                    hash: full_hash,
90                    tags_hash: diff.tags,
91                    mangle_hash: diff.mangle,
92                    topology,
93                    data,
94                    callback,
95                })
96                .await
97                .ok();
98            let Ok(r) = wait.await else {
99                return Err(object_rainbow::Error::Interrupted);
100            };
101            r?;
102        }
103        Ok(())
104    }
105}
106
107pub async fn fetchall(object: &impl Traversible) -> object_rainbow::Result<LocalMap> {
108    let mut map = LocalMap::new();
109    {
110        let mut started = BTreeMap::new();
111        let (send, recv) = flume::bounded(0);
112        let outer = Executor::new();
113        let inner = Executor::new();
114        let task =
115            inner.spawn(async {
116                while let Ok(request) = recv.recv_async().await {
117                    match request {
118                        Request::Depencencies {
119                            dependencies,
120                            callback,
121                        } => {
122                            let mut tasks = Vec::new();
123                            for (hash, save) in dependencies {
124                                let recv = match started.entry(hash) {
125                                    btree_map::Entry::Vacant(e) => {
126                                        let future = save(Context { request: &send });
127                                        let (send, recv) = flume::bounded(1);
128                                        let task = outer.spawn(async move {
129                                            send.send_async(future.await).await.ok();
130                                        });
131                                        e.insert((recv, task)).0.clone()
132                                    }
133                                    btree_map::Entry::Occupied(e) => e.get().0.clone(),
134                                };
135                                tasks.push(outer.spawn(async move {
136                                    recv.recv_async().await.unwrap_or(Ok(()))
137                                }));
138                            }
139                            outer
140                                .spawn(async move {
141                                    for task in tasks {
142                                        if let Err(e) = task.await {
143                                            callback.send(Err(e)).ok();
144                                            return;
145                                        }
146                                    }
147                                    callback.send(Ok(())).ok();
148                                })
149                                .detach();
150                        }
151                        Request::End {
152                            hash,
153                            tags_hash,
154                            mangle_hash,
155                            topology,
156                            data,
157                            callback,
158                        } => {
159                            assert!(!map.contains(hash));
160                            callback
161                                .send(map.insert(hash, tags_hash, mangle_hash, topology, data))
162                                .ok();
163                        }
164                    }
165                }
166            });
167        let _task = inner.spawn(outer.run(task));
168        inner
169            .run(Context { request: &send }.save_object(object))
170            .await?;
171    }
172    assert!(map.contains(object.full_hash()));
173    Ok(map)
174}