noosphere_cli/native/render/renderer.rs
1use anyhow::Result;
2use noosphere_core::context::HasSphereContext;
3use noosphere_storage::Storage;
4use std::{collections::BTreeSet, marker::PhantomData, sync::Arc, thread::available_parallelism};
5
6use tokio::{select, task::JoinSet};
7
8use super::{SphereRenderJob, SphereRenderRequest};
9use crate::native::{
10 paths::SpherePaths,
11 render::{JobKind, SphereRenderJobId},
12};
13
14const DEFAULT_RENDER_DEPTH: u32 = 3;
15
16/// [SphereRenderer] embodies all of the work needed to render a sphere graph to
17/// a workspace file system location. Starting from the "root" sphere, the
18/// renderer will efficiently queue work to concurrently render the sphere graph
19/// up to a maximum depth.
20///
21/// The renderer produces a file system structure that is approximated by this
22/// diagram:
23///
24/// ```sh
25/// /workspace_root/
26/// ├── foo.subtext
27/// ├── bar/
28/// │ └── baz.subtext
29/// ├── @my-peer/ -> ./.sphere/peers/bafyabc...x987
30/// ├── @other-peer/ -> ./.sphere/peers/bafyabc...y654
31/// └── .sphere/
32/// ├── identity # The sphere ID of the root sphere
33/// ├── version # Last rendered version of the root sphere
34/// ├── depth # Last rendered depth
35/// ├── slugs # Backward mapping of slugs to files; base64-encoded to escape
36/// │ │ # special characters that may occur in a slug (such as '/')
37/// │ ├── Zm9v -> ../../foo.subtext
38/// │ └── YmFyL2Jheg -> ../../bar/baz.subtext
39/// ├── storage/ # Storage folder distinguishes the root sphere
40/// │ └── ... # Implementation-specific e.g., Sled will have its own DB structure
41/// ├── content/ # Hard links to content that appears in peer spheres
42/// │ ├── bafyabc...a123
43/// │ ├── bafyabc...b456
44/// │ ├── bafyabc...c789
45/// │ └── ...
46/// └── peers/
47/// ├── bafyabc...x987/
48/// │ ├── identity
49/// │ ├── version
50/// │ ├── link_record # A valid link record for this peer at this version
51/// │ └── mount/ # The virtual root where a peers files an links to thier
52/// │ │ # peers are rendered
53/// │ ├── their-foo.subtext -> ../../../content/bafyabc...b456
54/// │ ├── @peer3 -> ../../../peers/bafyabc...y654/mount
55/// │ └── @peer4 -> ../../../peers/bafyabc...z321/mount
56/// ├── bafyabc...y654/
57/// │ ├── identity
58/// │ ├── version
59/// │ ├── link_record
60/// │ └── mount/
61/// │ └── ...
62/// ├── bafyabc...z321/
63/// │ ├── identity
64/// │ ├── version
65/// │ ├── link_record
66/// │ └── mount/
67/// │ └── ...
68/// └── ...
69/// ```
70///
71/// Peers throughout the graph are rendered into a flat structure. Each version
72/// of a peer sphere gets its own unique directory, and the "mount" subfolder
73/// therein contains a virtual file system representation of that sphere's
74/// contents and peers. The word "virtual" is used because all content and
75/// spheres within the mount are represented as symlinks. This enables maximum
76/// re-use of content across revisions of peers over time.
77///
78/// Note that since peers may re-appear in address books at different depths
79/// of graph traversal, it's possible to appear to have rendered more deeply
80/// than the "maximum" render depth (when in fact an already-rendered peer is
81/// just being re-used).
82
83pub struct SphereRenderer<C, S>
84where
85 C: HasSphereContext<S> + 'static,
86 S: Storage + 'static,
87{
88 context: C,
89 paths: Arc<SpherePaths>,
90 storage_type: PhantomData<S>,
91}
92
93impl<C, S> SphereRenderer<C, S>
94where
95 C: HasSphereContext<S> + 'static,
96 S: Storage + 'static,
97{
98 /// Construct a [SphereRenderer] for the given root [HasSphereContext] and
99 /// initialized [SpherePaths].
100 pub fn new(context: C, paths: Arc<SpherePaths>) -> Self {
101 SphereRenderer {
102 context,
103 paths,
104 storage_type: PhantomData,
105 }
106 }
107
108 /// Render the sphere graph up to the given depth; the renderer will attempt
109 /// to render different edges from the root concurrently, efficiently and
110 /// idempotently. If the specified render depth increases for a subsequent
111 /// render, all rendered peers will be reset and rendered again (although
112 /// the hard links to their content will remain unchanged).
113 #[instrument(level = "debug", skip(self))]
114 pub async fn render(&self, depth: Option<u32>, force_full_render: bool) -> Result<()> {
115 std::env::set_current_dir(self.paths.root())?;
116
117 let mut render_jobs = JoinSet::<Result<()>>::new();
118 let mut started_jobs = BTreeSet::<SphereRenderJobId>::new();
119
120 let max_parallel_jobs = available_parallelism()?.get();
121 let (tx, mut rx) = tokio::sync::mpsc::channel::<SphereRenderRequest>(max_parallel_jobs);
122
123 let last_render_depth =
124 if let Ok(depth) = tokio::fs::read_to_string(self.paths.depth()).await {
125 depth.parse::<u32>().ok()
126 } else {
127 None
128 };
129
130 let render_depth = if let Some(depth) = depth {
131 depth
132 } else {
133 last_render_depth.unwrap_or(DEFAULT_RENDER_DEPTH)
134 };
135
136 let force_render_peers = if force_full_render {
137 true
138 } else if let Some(last_render_depth) = last_render_depth {
139 render_depth > last_render_depth
140 } else {
141 false
142 };
143
144 if force_render_peers {
145 // NOTE: Sequencing is important here. This reset is performed
146 // by the renderer in advance of queuing any work because we
147 // cannot guarantee the order in which requests to render peers
148 // may come in, and it could happen out-of-order with a "refresh
149 // peers" job that is running concurrently.
150 self.reset_peers().await?;
151
152 debug!(
153 ?max_parallel_jobs,
154 ?render_depth,
155 "Spawning peer refresh render job for {}...",
156 self.context.identity().await?
157 );
158
159 render_jobs.spawn(
160 SphereRenderJob::new(
161 self.context.clone(),
162 JobKind::RefreshPeers,
163 self.paths.clone(),
164 Vec::new(),
165 tx.clone(),
166 )
167 .render(),
168 );
169 }
170
171 debug!(
172 ?max_parallel_jobs,
173 ?render_depth,
174 "Spawning root render job for {}...",
175 self.context.identity().await?
176 );
177
178 // Spawn the root job
179 render_jobs.spawn(
180 SphereRenderJob::new(
181 self.context.clone(),
182 JobKind::Root { force_full_render },
183 self.paths.clone(),
184 Vec::new(),
185 tx.clone(),
186 )
187 .render(),
188 );
189
190 let mut job_queue_open = true;
191
192 while !render_jobs.is_empty() && job_queue_open {
193 select! {
194 result = render_jobs.join_next() => {
195 if let Some(join_result) = result {
196 match join_result {
197 Err(error) => {
198 warn!("Failed to join render job task: {}", error);
199 },
200 Ok(task_result) => match task_result {
201 Ok(_) => (),
202 Err(error) => warn!("Render job failed: {}", error),
203 }
204 }
205 }
206 },
207 next_job_request = rx.recv() => {
208 match next_job_request {
209 None => {
210 job_queue_open = false;
211 }
212 Some(job_request) => {
213 let job_id = job_request.as_id();
214
215 if started_jobs.contains(&job_id) {
216 debug!("A render job for {} @ {} has already been queued, skipping...", job_id.0, job_id.1);
217 continue;
218 }
219
220 let SphereRenderRequest(petname_path, peer, version, link_record) = job_request;
221
222 // NOTE: If a peer is too deep, we _don't_ mark it as started; another
223 // peer may wish to render this peer at a shallower depth, in which case
224 // we should proceed.
225 if petname_path.len() > render_depth as usize {
226 debug!("Skipping render job for '@{}' (exceeds max render depth {render_depth})", petname_path.join("."));
227 continue;
228 }
229
230 debug!(?petname_path, "Queuing render job for {} @ {}...", job_id.0, job_id.1);
231
232 started_jobs.insert(job_id);
233
234 if self.paths.peer(&peer, &version).exists() {
235 // TODO(#559): We may need to re-render if a previous
236 // render was incomplete for some reason
237 debug!(
238 "Content for {} @ {} is already rendered, skipping...",
239 peer, version
240 );
241 continue;
242 }
243
244 debug!("Spawning render job for {peer} @ {version}...");
245
246 render_jobs.spawn(
247 SphereRenderJob::new(
248 self.context.clone(),
249 JobKind::Peer(peer, version, link_record),
250 self.paths.clone(),
251 petname_path,
252 tx.clone()
253 ).render()
254 );
255 }
256 }
257 }
258 }
259 }
260
261 tokio::fs::write(self.paths.depth(), render_depth.to_string()).await?;
262
263 Ok(())
264 }
265
266 async fn reset_peers(&self) -> Result<()> {
267 if let Err(error) = tokio::fs::remove_dir_all(self.paths.peers()).await {
268 warn!(
269 path = ?self.paths.peers(),
270 "Failed attempt to reset peers: {}", error
271 );
272 }
273
274 tokio::fs::create_dir_all(self.paths.peers()).await?;
275
276 Ok(())
277 }
278}