noosphere_cli/native/render/
renderer.rs

1use anyhow::Result;
2use noosphere_core::context::HasSphereContext;
3use noosphere_storage::Storage;
4use std::{collections::BTreeSet, marker::PhantomData, sync::Arc, thread::available_parallelism};
5
6use tokio::{select, task::JoinSet};
7
8use super::{SphereRenderJob, SphereRenderRequest};
9use crate::native::{
10    paths::SpherePaths,
11    render::{JobKind, SphereRenderJobId},
12};
13
14const DEFAULT_RENDER_DEPTH: u32 = 3;
15
16/// [SphereRenderer] embodies all of the work needed to render a sphere graph to
17/// a workspace file system location. Starting from the "root" sphere, the
18/// renderer will efficiently queue work to concurrently render the sphere graph
19/// up to a maximum depth.
20///
21/// The renderer produces a file system structure that is approximated by this
22/// diagram:
23///
24/// ```sh
25/// /workspace_root/
26/// ├── foo.subtext
27/// ├── bar/
28/// │   └── baz.subtext
29/// ├── @my-peer/ -> ./.sphere/peers/bafyabc...x987
30/// ├── @other-peer/ -> ./.sphere/peers/bafyabc...y654
31/// └── .sphere/
32///     ├── identity # The sphere ID of the root sphere
33///     ├── version  # Last rendered version of the root sphere
34///     ├── depth    # Last rendered depth
35///     ├── slugs    # Backward mapping of slugs to files; base64-encoded to escape
36///     │   │        # special characters that may occur in a slug (such as '/')
37///     │   ├── Zm9v -> ../../foo.subtext
38///     │   └── YmFyL2Jheg -> ../../bar/baz.subtext
39///     ├── storage/ # Storage folder distinguishes the root sphere
40///     │   └── ...  # Implementation-specific e.g., Sled will have its own DB structure
41///     ├── content/ # Hard links to content that appears in peer spheres
42///     │   ├── bafyabc...a123
43///     │   ├── bafyabc...b456
44///     │   ├── bafyabc...c789
45///     │   └── ...
46///     └── peers/
47///         ├── bafyabc...x987/
48///         │   ├── identity
49///         │   ├── version
50///         │   ├── link_record  # A valid link record for this peer at this version
51///         │   └── mount/       # The virtual root where a peers files an links to thier
52///         │       │            # peers are rendered
53///         │       ├── their-foo.subtext -> ../../../content/bafyabc...b456
54///         │       ├── @peer3 -> ../../../peers/bafyabc...y654/mount
55///         │       └── @peer4 -> ../../../peers/bafyabc...z321/mount
56///         ├── bafyabc...y654/
57///         │   ├── identity
58///         │   ├── version
59///         │   ├── link_record
60///         │   └── mount/
61///         │       └── ...
62///         ├── bafyabc...z321/
63///         │   ├── identity
64///         │   ├── version
65///         │   ├── link_record
66///         │   └── mount/
67///         │       └── ...
68///         └── ...
69/// ```
70///
71/// Peers throughout the graph are rendered into a flat structure. Each version
72/// of a peer sphere gets its own unique directory, and the "mount" subfolder
73/// therein contains a virtual file system representation of that sphere's
74/// contents and peers. The word "virtual" is used because all content and
75/// spheres within the mount are represented as symlinks. This enables maximum
76/// re-use of content across revisions of peers over time.
77///
78/// Note that since peers may re-appear in address books at different depths
79/// of graph traversal, it's possible to appear to have rendered more deeply
80/// than the "maximum" render depth (when in fact an already-rendered peer is
81/// just being re-used).
82
83pub struct SphereRenderer<C, S>
84where
85    C: HasSphereContext<S> + 'static,
86    S: Storage + 'static,
87{
88    context: C,
89    paths: Arc<SpherePaths>,
90    storage_type: PhantomData<S>,
91}
92
93impl<C, S> SphereRenderer<C, S>
94where
95    C: HasSphereContext<S> + 'static,
96    S: Storage + 'static,
97{
98    /// Construct a [SphereRenderer] for the given root [HasSphereContext] and
99    /// initialized [SpherePaths].
100    pub fn new(context: C, paths: Arc<SpherePaths>) -> Self {
101        SphereRenderer {
102            context,
103            paths,
104            storage_type: PhantomData,
105        }
106    }
107
108    /// Render the sphere graph up to the given depth; the renderer will attempt
109    /// to render different edges from the root concurrently, efficiently and
110    /// idempotently. If the specified render depth increases for a subsequent
111    /// render, all rendered peers will be reset and rendered again (although
112    /// the hard links to their content will remain unchanged).
113    #[instrument(level = "debug", skip(self))]
114    pub async fn render(&self, depth: Option<u32>, force_full_render: bool) -> Result<()> {
115        std::env::set_current_dir(self.paths.root())?;
116
117        let mut render_jobs = JoinSet::<Result<()>>::new();
118        let mut started_jobs = BTreeSet::<SphereRenderJobId>::new();
119
120        let max_parallel_jobs = available_parallelism()?.get();
121        let (tx, mut rx) = tokio::sync::mpsc::channel::<SphereRenderRequest>(max_parallel_jobs);
122
123        let last_render_depth =
124            if let Ok(depth) = tokio::fs::read_to_string(self.paths.depth()).await {
125                depth.parse::<u32>().ok()
126            } else {
127                None
128            };
129
130        let render_depth = if let Some(depth) = depth {
131            depth
132        } else {
133            last_render_depth.unwrap_or(DEFAULT_RENDER_DEPTH)
134        };
135
136        let force_render_peers = if force_full_render {
137            true
138        } else if let Some(last_render_depth) = last_render_depth {
139            render_depth > last_render_depth
140        } else {
141            false
142        };
143
144        if force_render_peers {
145            // NOTE: Sequencing is important here. This reset is performed
146            // by the renderer in advance of queuing any work because we
147            // cannot guarantee the order in which requests to render peers
148            // may come in, and it could happen out-of-order with a "refresh
149            // peers" job that is running concurrently.
150            self.reset_peers().await?;
151
152            debug!(
153                ?max_parallel_jobs,
154                ?render_depth,
155                "Spawning peer refresh render job for {}...",
156                self.context.identity().await?
157            );
158
159            render_jobs.spawn(
160                SphereRenderJob::new(
161                    self.context.clone(),
162                    JobKind::RefreshPeers,
163                    self.paths.clone(),
164                    Vec::new(),
165                    tx.clone(),
166                )
167                .render(),
168            );
169        }
170
171        debug!(
172            ?max_parallel_jobs,
173            ?render_depth,
174            "Spawning root render job for {}...",
175            self.context.identity().await?
176        );
177
178        // Spawn the root job
179        render_jobs.spawn(
180            SphereRenderJob::new(
181                self.context.clone(),
182                JobKind::Root { force_full_render },
183                self.paths.clone(),
184                Vec::new(),
185                tx.clone(),
186            )
187            .render(),
188        );
189
190        let mut job_queue_open = true;
191
192        while !render_jobs.is_empty() && job_queue_open {
193            select! {
194                result = render_jobs.join_next() => {
195                    if let Some(join_result) = result {
196                        match join_result {
197                            Err(error) => {
198                                warn!("Failed to join render job task: {}", error);
199                            },
200                            Ok(task_result) => match task_result {
201                                Ok(_) => (),
202                                Err(error) => warn!("Render job failed: {}", error),
203                            }
204                        }
205                    }
206                },
207                next_job_request = rx.recv() => {
208                    match next_job_request {
209                        None => {
210                            job_queue_open = false;
211                        }
212                        Some(job_request) => {
213                            let job_id = job_request.as_id();
214
215                            if started_jobs.contains(&job_id) {
216                                debug!("A render job for {} @ {} has already been queued, skipping...", job_id.0, job_id.1);
217                                continue;
218                            }
219
220                            let SphereRenderRequest(petname_path, peer, version, link_record) = job_request;
221
222                            // NOTE: If a peer is too deep, we _don't_ mark it as started; another
223                            // peer may wish to render this peer at a shallower depth, in which case
224                            // we should proceed.
225                            if petname_path.len() > render_depth as usize {
226                                debug!("Skipping render job for '@{}' (exceeds max render depth {render_depth})", petname_path.join("."));
227                                continue;
228                            }
229
230                            debug!(?petname_path, "Queuing render job for {} @ {}...", job_id.0, job_id.1);
231
232                            started_jobs.insert(job_id);
233
234                            if self.paths.peer(&peer, &version).exists() {
235                                // TODO(#559): We may need to re-render if a previous
236                                // render was incomplete for some reason
237                                debug!(
238                                    "Content for {} @ {} is already rendered, skipping...",
239                                    peer, version
240                                );
241                                continue;
242                            }
243
244                            debug!("Spawning render job for {peer} @ {version}...");
245
246                            render_jobs.spawn(
247                                SphereRenderJob::new(
248                                    self.context.clone(),
249                                    JobKind::Peer(peer, version, link_record),
250                                    self.paths.clone(),
251                                    petname_path,
252                                    tx.clone()
253                                ).render()
254                            );
255                        }
256                    }
257                }
258            }
259        }
260
261        tokio::fs::write(self.paths.depth(), render_depth.to_string()).await?;
262
263        Ok(())
264    }
265
266    async fn reset_peers(&self) -> Result<()> {
267        if let Err(error) = tokio::fs::remove_dir_all(self.paths.peers()).await {
268            warn!(
269                path = ?self.paths.peers(),
270                "Failed attempt to reset peers: {}", error
271            );
272        }
273
274        tokio::fs::create_dir_all(self.paths.peers()).await?;
275
276        Ok(())
277    }
278}