1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
use anyhow::Result;
use noosphere_core::context::HasSphereContext;
use noosphere_storage::Storage;
use std::{collections::BTreeSet, marker::PhantomData, sync::Arc, thread::available_parallelism};
use tokio::{select, task::JoinSet};
use super::{SphereRenderJob, SphereRenderRequest};
use crate::native::{
paths::SpherePaths,
render::{JobKind, SphereRenderJobId},
};
const DEFAULT_RENDER_DEPTH: u32 = 3;
/// [SphereRenderer] embodies all of the work needed to render a sphere graph to
/// a workspace file system location. Starting from the "root" sphere, the
/// renderer will efficiently queue work to concurrently render the sphere graph
/// up to a maximum depth.
///
/// The renderer produces a file system structure that is approximated by this
/// diagram:
///
/// ```sh
/// /workspace_root/
/// ├── foo.subtext
/// ├── bar/
/// │ └── baz.subtext
/// ├── @my-peer/ -> ./.sphere/peers/bafyabc...x987
/// ├── @other-peer/ -> ./.sphere/peers/bafyabc...y654
/// └── .sphere/
/// ├── identity # The sphere ID of the root sphere
/// ├── version # Last rendered version of the root sphere
/// ├── depth # Last rendered depth
/// ├── slugs # Backward mapping of slugs to files; base64-encoded to escape
/// │ │ # special characters that may occur in a slug (such as '/')
/// │ ├── Zm9v -> ../../foo.subtext
/// │ └── YmFyL2Jheg -> ../../bar/baz.subtext
/// ├── storage/ # Storage folder distinguishes the root sphere
/// │ └── ... # Implementation-specific e.g., Sled will have its own DB structure
/// ├── content/ # Hard links to content that appears in peer spheres
/// │ ├── bafyabc...a123
/// │ ├── bafyabc...b456
/// │ ├── bafyabc...c789
/// │ └── ...
/// └── peers/
/// ├── bafyabc...x987/
/// │ ├── identity
/// │ ├── version
/// │ ├── link_record # A valid link record for this peer at this version
/// │ └── mount/ # The virtual root where a peers files an links to thier
/// │ │ # peers are rendered
/// │ ├── their-foo.subtext -> ../../../content/bafyabc...b456
/// │ ├── @peer3 -> ../../../peers/bafyabc...y654/mount
/// │ └── @peer4 -> ../../../peers/bafyabc...z321/mount
/// ├── bafyabc...y654/
/// │ ├── identity
/// │ ├── version
/// │ ├── link_record
/// │ └── mount/
/// │ └── ...
/// ├── bafyabc...z321/
/// │ ├── identity
/// │ ├── version
/// │ ├── link_record
/// │ └── mount/
/// │ └── ...
/// └── ...
/// ```
///
/// Peers throughout the graph are rendered into a flat structure. Each version
/// of a peer sphere gets its own unique directory, and the "mount" subfolder
/// therein contains a virtual file system representation of that sphere's
/// contents and peers. The word "virtual" is used because all content and
/// spheres within the mount are represented as symlinks. This enables maximum
/// re-use of content across revisions of peers over time.
///
/// Note that since peers may re-appear in address books at different depths
/// of graph traversal, it's possible to appear to have rendered more deeply
/// than the "maximum" render depth (when in fact an already-rendered peer is
/// just being re-used).
pub struct SphereRenderer<C, S>
where
C: HasSphereContext<S> + 'static,
S: Storage + 'static,
{
context: C,
paths: Arc<SpherePaths>,
storage_type: PhantomData<S>,
}
impl<C, S> SphereRenderer<C, S>
where
C: HasSphereContext<S> + 'static,
S: Storage + 'static,
{
/// Construct a [SphereRenderer] for the given root [HasSphereContext] and
/// initialized [SpherePaths].
pub fn new(context: C, paths: Arc<SpherePaths>) -> Self {
SphereRenderer {
context,
paths,
storage_type: PhantomData,
}
}
/// Render the sphere graph up to the given depth; the renderer will attempt
/// to render different edges from the root concurrently, efficiently and
/// idempotently. If the specified render depth increases for a subsequent
/// render, all rendered peers will be reset and rendered again (although
/// the hard links to their content will remain unchanged).
#[instrument(level = "debug", skip(self))]
pub async fn render(&self, depth: Option<u32>, force_full_render: bool) -> Result<()> {
std::env::set_current_dir(self.paths.root())?;
let mut render_jobs = JoinSet::<Result<()>>::new();
let mut started_jobs = BTreeSet::<SphereRenderJobId>::new();
let max_parallel_jobs = available_parallelism()?.get();
let (tx, mut rx) = tokio::sync::mpsc::channel::<SphereRenderRequest>(max_parallel_jobs);
let last_render_depth =
if let Ok(depth) = tokio::fs::read_to_string(self.paths.depth()).await {
depth.parse::<u32>().ok()
} else {
None
};
let render_depth = if let Some(depth) = depth {
depth
} else {
last_render_depth.unwrap_or(DEFAULT_RENDER_DEPTH)
};
let force_render_peers = if force_full_render {
true
} else if let Some(last_render_depth) = last_render_depth {
render_depth > last_render_depth
} else {
false
};
if force_render_peers {
// NOTE: Sequencing is important here. This reset is performed
// by the renderer in advance of queuing any work because we
// cannot guarantee the order in which requests to render peers
// may come in, and it could happen out-of-order with a "refresh
// peers" job that is running concurrently.
self.reset_peers().await?;
debug!(
?max_parallel_jobs,
?render_depth,
"Spawning peer refresh render job for {}...",
self.context.identity().await?
);
render_jobs.spawn(
SphereRenderJob::new(
self.context.clone(),
JobKind::RefreshPeers,
self.paths.clone(),
Vec::new(),
tx.clone(),
)
.render(),
);
}
debug!(
?max_parallel_jobs,
?render_depth,
"Spawning root render job for {}...",
self.context.identity().await?
);
// Spawn the root job
render_jobs.spawn(
SphereRenderJob::new(
self.context.clone(),
JobKind::Root { force_full_render },
self.paths.clone(),
Vec::new(),
tx.clone(),
)
.render(),
);
let mut job_queue_open = true;
while !render_jobs.is_empty() && job_queue_open {
select! {
result = render_jobs.join_next() => {
if let Some(join_result) = result {
match join_result {
Err(error) => {
warn!("Failed to join render job task: {}", error);
},
Ok(task_result) => match task_result {
Ok(_) => (),
Err(error) => warn!("Render job failed: {}", error),
}
}
}
},
next_job_request = rx.recv() => {
match next_job_request {
None => {
job_queue_open = false;
}
Some(job_request) => {
let job_id = job_request.as_id();
if started_jobs.contains(&job_id) {
debug!("A render job for {} @ {} has already been queued, skipping...", job_id.0, job_id.1);
continue;
}
let SphereRenderRequest(petname_path, peer, version, link_record) = job_request;
// NOTE: If a peer is too deep, we _don't_ mark it as started; another
// peer may wish to render this peer at a shallower depth, in which case
// we should proceed.
if petname_path.len() > render_depth as usize {
debug!("Skipping render job for '@{}' (exceeds max render depth {render_depth})", petname_path.join("."));
continue;
}
debug!(?petname_path, "Queuing render job for {} @ {}...", job_id.0, job_id.1);
started_jobs.insert(job_id);
if self.paths.peer(&peer, &version).exists() {
// TODO(#559): We may need to re-render if a previous
// render was incomplete for some reason
debug!(
"Content for {} @ {} is already rendered, skipping...",
peer, version
);
continue;
}
debug!("Spawning render job for {peer} @ {version}...");
render_jobs.spawn(
SphereRenderJob::new(
self.context.clone(),
JobKind::Peer(peer, version, link_record),
self.paths.clone(),
petname_path,
tx.clone()
).render()
);
}
}
}
}
}
tokio::fs::write(self.paths.depth(), render_depth.to_string()).await?;
Ok(())
}
async fn reset_peers(&self) -> Result<()> {
if let Err(error) = tokio::fs::remove_dir_all(self.paths.peers()).await {
warn!(
path = ?self.paths.peers(),
"Failed attempt to reset peers: {}", error
);
}
tokio::fs::create_dir_all(self.paths.peers()).await?;
Ok(())
}
}