noosphere_cli/native/render/
job.rs1use anyhow::{anyhow, Result};
4use cid::Cid;
5use noosphere_core::context::{
6 HasSphereContext, SphereContentRead, SphereCursor, SpherePetnameRead, SphereReplicaRead,
7 SphereWalker,
8};
9use noosphere_core::data::{Did, Link, LinkRecord, MemoIpld};
10use noosphere_storage::Storage;
11use std::{marker::PhantomData, sync::Arc};
12use tokio::sync::mpsc::Sender;
13use tokio_stream::StreamExt;
14
15use crate::native::{paths::SpherePaths, render::ChangeBuffer};
16
17use super::writer::SphereWriter;
18
19const CONTENT_CHANGE_BUFFER_CAPACITY: usize = 512;
20const PETNAME_CHANGE_BUFFER_CAPACITY: usize = 2048;
21
22pub type SphereRenderJobId = (Did, Cid);
26
27pub struct SphereRenderRequest(pub Vec<String>, pub Did, pub Cid, pub LinkRecord);
30
31impl SphereRenderRequest {
32 pub fn as_id(&self) -> SphereRenderJobId {
34 (self.1.clone(), self.2)
35 }
36}
37
38#[derive(Clone, Debug, Eq, PartialEq)]
42pub enum JobKind {
43 Root {
45 force_full_render: bool,
47 },
48 Peer(Did, Cid, LinkRecord),
50 RefreshPeers,
52}
53
54pub struct SphereRenderJob<C, S>
60where
61 C: HasSphereContext<S> + 'static,
62 S: Storage + 'static,
63{
64 context: C,
65 kind: JobKind,
66 petname_path: Vec<String>,
67 writer: SphereWriter,
68 storage_type: PhantomData<S>,
69 job_queue: Sender<SphereRenderRequest>,
70}
71
72impl<C, S> SphereRenderJob<C, S>
73where
74 C: HasSphereContext<S> + 'static,
75 S: Storage + 'static,
76{
77 pub fn new(
80 context: C,
81 kind: JobKind,
82 paths: Arc<SpherePaths>,
83 petname_path: Vec<String>,
84 job_queue: Sender<SphereRenderRequest>,
85 ) -> Self {
86 SphereRenderJob {
87 context,
88 petname_path,
89 writer: SphereWriter::new(kind.clone(), paths),
90 kind,
91 storage_type: PhantomData,
92 job_queue,
93 }
94 }
95
96 #[instrument(level = "debug", skip(self))]
98 pub async fn render(self) -> Result<()> {
99 match self.kind {
100 JobKind::Root { force_full_render } => {
101 info!("Rendering this sphere...");
102 match (
103 force_full_render,
104 tokio::fs::try_exists(self.paths().version()).await,
105 ) {
106 (false, Ok(true)) => {
107 debug!("Root has been rendered at least once; rendering incrementally...");
108 let version = Cid::try_from(
109 tokio::fs::read_to_string(self.paths().version()).await?,
110 )?;
111 self.incremental_render(&version.into()).await?;
112 }
113 _ => {
114 if force_full_render {
115 debug!("Root full render is being forced...");
116 } else {
117 debug!("Root has not been rendered yet; performing a full render...");
118 }
119 self.full_render(SphereCursor::latest(self.context.clone()))
120 .await?
121 }
122 }
123 }
124 JobKind::Peer(_, _, _) => {
125 info!("Rendering @{}...", self.petname_path.join("."));
126 if let Some(context) = SphereCursor::latest(self.context.clone())
127 .traverse_by_petnames(&self.petname_path)
128 .await?
129 {
130 self.full_render(context).await?;
131 } else {
132 return Err(anyhow!("No peer found at {}", self.petname_path.join(".")));
133 };
134 }
135 JobKind::RefreshPeers => {
136 debug!("Running refresh peers render job...");
137 self.refresh_peers(SphereCursor::latest(self.context.clone()))
138 .await?;
139 }
140 }
141
142 Ok(())
143 }
144
145 fn paths(&self) -> &SpherePaths {
146 self.writer.paths()
147 }
148
149 #[instrument(level = "debug", skip(self, cursor))]
150 async fn full_render(&self, cursor: SphereCursor<C, S>) -> Result<()> {
151 let identity = cursor.identity().await?;
152 let version = cursor.version().await?;
153
154 debug!("Starting full render of {identity} @ {version}...");
155
156 {
157 let content_stream = SphereWalker::from(&cursor).into_content_stream();
158
159 tokio::pin!(content_stream);
160
161 let mut content_change_buffer = ChangeBuffer::new(CONTENT_CHANGE_BUFFER_CAPACITY);
162
163 while let Some((slug, file)) = content_stream.try_next().await? {
165 content_change_buffer.add(slug, file)?;
166
167 if content_change_buffer.is_full() {
168 content_change_buffer.flush_to_writer(&self.writer).await?;
169 }
170 }
171
172 content_change_buffer.flush_to_writer(&self.writer).await?;
173 }
174
175 self.refresh_peers(cursor).await?;
176
177 tokio::try_join!(
179 self.writer.write_identity_and_version(&identity, &version),
180 self.writer.write_link_record()
181 )?;
182
183 Ok(())
184 }
185
186 #[instrument(level = "debug", skip(self, cursor))]
187 async fn refresh_peers(&self, cursor: SphereCursor<C, S>) -> Result<()> {
188 let petname_stream = SphereWalker::from(&cursor).into_petname_stream();
189 let db = cursor.sphere_context().await?.db().clone();
190
191 let mut petname_change_buffer = ChangeBuffer::new(PETNAME_CHANGE_BUFFER_CAPACITY);
192
193 tokio::pin!(petname_stream);
194
195 while let Some((name, identity)) = petname_stream.try_next().await? {
197 let did = identity.did.clone();
198 let (link_record, cid) = match identity.link_record(&db).await {
199 Some(link_record) => {
200 if let Some(cid) = link_record.get_link() {
201 (link_record, cid)
202 } else {
203 debug!("No version resolved for '@{name}', skipping...");
204 continue;
205 }
206 }
207 None => {
208 debug!("No link record found for '@{name}', skipping...");
209 continue;
210 }
211 };
212
213 petname_change_buffer.add(name.clone(), (did.clone(), cid.into()))?;
216
217 if petname_change_buffer.is_full() {
218 petname_change_buffer.flush_to_writer(&self.writer).await?;
219 }
220
221 let mut petname_path = vec![name];
224 petname_path.append(&mut self.petname_path.clone());
225 self.job_queue
226 .send(SphereRenderRequest(
227 petname_path,
228 did,
229 cid.into(),
230 link_record,
231 ))
232 .await?;
233 }
234
235 petname_change_buffer.flush_to_writer(&self.writer).await?;
236
237 Ok(())
238 }
239
240 #[instrument(level = "debug", skip(self))]
241 async fn incremental_render(&self, since: &Link<MemoIpld>) -> Result<()> {
242 let content_change_stream =
243 SphereWalker::from(&self.context).into_content_change_stream(Some(since));
244 let mut cursor = SphereCursor::latest(self.context.clone());
245 let mut content_change_buffer = ChangeBuffer::new(CONTENT_CHANGE_BUFFER_CAPACITY);
246
247 tokio::pin!(content_change_stream);
248
249 while let Some((version, changes)) = content_change_stream.try_next().await? {
250 cursor.mount_at(&version).await?;
251
252 for slug in changes {
253 trace!(version = ?version, slug = ?slug, "Buffering change...");
254 match cursor.read(&slug).await? {
255 Some(file) => content_change_buffer.add(slug, file)?,
256 None => content_change_buffer.remove(&slug)?,
257 }
258
259 if content_change_buffer.is_full() {
260 content_change_buffer.flush_to_writer(&self.writer).await?;
261 }
262 }
263 }
264
265 content_change_buffer.flush_to_writer(&self.writer).await?;
266
267 let petname_change_stream =
268 SphereWalker::from(&self.context).into_petname_change_stream(Some(since));
269 let mut petname_change_buffer = ChangeBuffer::new(PETNAME_CHANGE_BUFFER_CAPACITY);
270
271 tokio::pin!(petname_change_stream);
272
273 while let Some((version, changes)) = petname_change_stream.try_next().await? {
274 cursor.mount_at(&version).await?;
275
276 for petname in changes {
277 match cursor.get_petname(&petname).await? {
278 Some(identity) => match cursor.get_petname_record(&petname).await? {
279 Some(link_record) => {
280 if let Some(version) = link_record.get_link() {
281 petname_change_buffer
282 .add(petname.clone(), (identity.clone(), Cid::from(version)))?;
283
284 let mut petname_path = self.petname_path.clone();
285 petname_path.push(petname);
286 self.job_queue
287 .send(SphereRenderRequest(
288 petname_path,
289 identity,
290 Cid::from(version),
291 link_record,
292 ))
293 .await?;
294 } else {
295 petname_change_buffer.remove(&petname)?;
296 }
297 }
298 None => petname_change_buffer.remove(&petname)?,
299 },
300 None => petname_change_buffer.remove(&petname)?,
301 }
302
303 if petname_change_buffer.is_full() {
304 petname_change_buffer.flush_to_writer(&self.writer).await?;
305 }
306 }
307 }
308
309 petname_change_buffer.flush_to_writer(&self.writer).await?;
310
311 let identity = cursor.identity().await?;
313 let version = cursor.version().await?;
314
315 self.writer
316 .write_identity_and_version(&identity, &version)
317 .await?;
318
319 Ok(())
320 }
321}