noosphere_cli/native/render/
job.rs

1//! Constructs related to performing specific rendering tasks
2
3use anyhow::{anyhow, Result};
4use cid::Cid;
5use noosphere_core::context::{
6    HasSphereContext, SphereContentRead, SphereCursor, SpherePetnameRead, SphereReplicaRead,
7    SphereWalker,
8};
9use noosphere_core::data::{Did, Link, LinkRecord, MemoIpld};
10use noosphere_storage::Storage;
11use std::{marker::PhantomData, sync::Arc};
12use tokio::sync::mpsc::Sender;
13use tokio_stream::StreamExt;
14
15use crate::native::{paths::SpherePaths, render::ChangeBuffer};
16
17use super::writer::SphereWriter;
18
19const CONTENT_CHANGE_BUFFER_CAPACITY: usize = 512;
20const PETNAME_CHANGE_BUFFER_CAPACITY: usize = 2048;
21
22/// A pairing of [Did] and [Cid], suitable for uniquely identifying the work
23/// needed to render a specific sphere at a specific version, regardless of
24/// relative location in the graph
25pub type SphereRenderJobId = (Did, Cid);
26
27/// A request to render a sphere, originating from a given path through the
28/// graph.
29pub struct SphereRenderRequest(pub Vec<String>, pub Did, pub Cid, pub LinkRecord);
30
31impl SphereRenderRequest {
32    /// Get the [SphereRenderJobId] that corresponds to this request
33    pub fn as_id(&self) -> SphereRenderJobId {
34        (self.1.clone(), self.2)
35    }
36}
37
38/// The kind of render work to be performed by a [SphereRenderJob]. The effects
39/// of using each variant have a lot of overlap, but the specific results vary
40/// significantly
41#[derive(Clone, Debug, Eq, PartialEq)]
42pub enum JobKind {
43    /// A job that renders the root sphere
44    Root {
45        /// If an incremental render would be performed, force a full render
46        force_full_render: bool,
47    },
48    /// A job that renders a peer (or peer-of-a-peer) of the root sphere
49    Peer(Did, Cid, LinkRecord),
50    /// A job that renders _just_ the peers of the root sphere
51    RefreshPeers,
52}
53
54/// A [SphereRenderJob] encapsulates the state needed to perform some discrete
55/// [JobKind], and implements the specified render path that corresponds to that
56/// [JobKind]. It is designed to be able to be run concurrently or in parallel
57/// with an arbitrary number of other [SphereRenderJob]s that are associated with
58/// the local sphere workspace.
59pub struct SphereRenderJob<C, S>
60where
61    C: HasSphereContext<S> + 'static,
62    S: Storage + 'static,
63{
64    context: C,
65    kind: JobKind,
66    petname_path: Vec<String>,
67    writer: SphereWriter,
68    storage_type: PhantomData<S>,
69    job_queue: Sender<SphereRenderRequest>,
70}
71
72impl<C, S> SphereRenderJob<C, S>
73where
74    C: HasSphereContext<S> + 'static,
75    S: Storage + 'static,
76{
77    /// Construct a new render job of [JobKind], using the given [HasSphereContext] and
78    /// [SpherePaths] to perform rendering.
79    pub fn new(
80        context: C,
81        kind: JobKind,
82        paths: Arc<SpherePaths>,
83        petname_path: Vec<String>,
84        job_queue: Sender<SphereRenderRequest>,
85    ) -> Self {
86        SphereRenderJob {
87            context,
88            petname_path,
89            writer: SphereWriter::new(kind.clone(), paths),
90            kind,
91            storage_type: PhantomData,
92            job_queue,
93        }
94    }
95
96    /// Entrypoint to render based on the [SphereRenderJob] configuration
97    #[instrument(level = "debug", skip(self))]
98    pub async fn render(self) -> Result<()> {
99        match self.kind {
100            JobKind::Root { force_full_render } => {
101                info!("Rendering this sphere...");
102                match (
103                    force_full_render,
104                    tokio::fs::try_exists(self.paths().version()).await,
105                ) {
106                    (false, Ok(true)) => {
107                        debug!("Root has been rendered at least once; rendering incrementally...");
108                        let version = Cid::try_from(
109                            tokio::fs::read_to_string(self.paths().version()).await?,
110                        )?;
111                        self.incremental_render(&version.into()).await?;
112                    }
113                    _ => {
114                        if force_full_render {
115                            debug!("Root full render is being forced...");
116                        } else {
117                            debug!("Root has not been rendered yet; performing a full render...");
118                        }
119                        self.full_render(SphereCursor::latest(self.context.clone()))
120                            .await?
121                    }
122                }
123            }
124            JobKind::Peer(_, _, _) => {
125                info!("Rendering @{}...", self.petname_path.join("."));
126                if let Some(context) = SphereCursor::latest(self.context.clone())
127                    .traverse_by_petnames(&self.petname_path)
128                    .await?
129                {
130                    self.full_render(context).await?;
131                } else {
132                    return Err(anyhow!("No peer found at {}", self.petname_path.join(".")));
133                };
134            }
135            JobKind::RefreshPeers => {
136                debug!("Running refresh peers render job...");
137                self.refresh_peers(SphereCursor::latest(self.context.clone()))
138                    .await?;
139            }
140        }
141
142        Ok(())
143    }
144
145    fn paths(&self) -> &SpherePaths {
146        self.writer.paths()
147    }
148
149    #[instrument(level = "debug", skip(self, cursor))]
150    async fn full_render(&self, cursor: SphereCursor<C, S>) -> Result<()> {
151        let identity = cursor.identity().await?;
152        let version = cursor.version().await?;
153
154        debug!("Starting full render of {identity} @ {version}...");
155
156        {
157            let content_stream = SphereWalker::from(&cursor).into_content_stream();
158
159            tokio::pin!(content_stream);
160
161            let mut content_change_buffer = ChangeBuffer::new(CONTENT_CHANGE_BUFFER_CAPACITY);
162
163            // Write all content
164            while let Some((slug, file)) = content_stream.try_next().await? {
165                content_change_buffer.add(slug, file)?;
166
167                if content_change_buffer.is_full() {
168                    content_change_buffer.flush_to_writer(&self.writer).await?;
169                }
170            }
171
172            content_change_buffer.flush_to_writer(&self.writer).await?;
173        }
174
175        self.refresh_peers(cursor).await?;
176
177        // Write out the latest version that was rendered
178        tokio::try_join!(
179            self.writer.write_identity_and_version(&identity, &version),
180            self.writer.write_link_record()
181        )?;
182
183        Ok(())
184    }
185
186    #[instrument(level = "debug", skip(self, cursor))]
187    async fn refresh_peers(&self, cursor: SphereCursor<C, S>) -> Result<()> {
188        let petname_stream = SphereWalker::from(&cursor).into_petname_stream();
189        let db = cursor.sphere_context().await?.db().clone();
190
191        let mut petname_change_buffer = ChangeBuffer::new(PETNAME_CHANGE_BUFFER_CAPACITY);
192
193        tokio::pin!(petname_stream);
194
195        // Write all peer symlinks, queuing jobs to render them as we go
196        while let Some((name, identity)) = petname_stream.try_next().await? {
197            let did = identity.did.clone();
198            let (link_record, cid) = match identity.link_record(&db).await {
199                Some(link_record) => {
200                    if let Some(cid) = link_record.get_link() {
201                        (link_record, cid)
202                    } else {
203                        debug!("No version resolved for '@{name}', skipping...");
204                        continue;
205                    }
206                }
207                None => {
208                    debug!("No link record found for '@{name}', skipping...");
209                    continue;
210                }
211            };
212
213            // Create a symlink to each peer (they will be rendered later, if
214            // they haven't been already)
215            petname_change_buffer.add(name.clone(), (did.clone(), cid.into()))?;
216
217            if petname_change_buffer.is_full() {
218                petname_change_buffer.flush_to_writer(&self.writer).await?;
219            }
220
221            // Ensure the peer is queued to be rendered (redundant jobs are
222            // depuplicated by the receiver)
223            let mut petname_path = vec![name];
224            petname_path.append(&mut self.petname_path.clone());
225            self.job_queue
226                .send(SphereRenderRequest(
227                    petname_path,
228                    did,
229                    cid.into(),
230                    link_record,
231                ))
232                .await?;
233        }
234
235        petname_change_buffer.flush_to_writer(&self.writer).await?;
236
237        Ok(())
238    }
239
240    #[instrument(level = "debug", skip(self))]
241    async fn incremental_render(&self, since: &Link<MemoIpld>) -> Result<()> {
242        let content_change_stream =
243            SphereWalker::from(&self.context).into_content_change_stream(Some(since));
244        let mut cursor = SphereCursor::latest(self.context.clone());
245        let mut content_change_buffer = ChangeBuffer::new(CONTENT_CHANGE_BUFFER_CAPACITY);
246
247        tokio::pin!(content_change_stream);
248
249        while let Some((version, changes)) = content_change_stream.try_next().await? {
250            cursor.mount_at(&version).await?;
251
252            for slug in changes {
253                trace!(version = ?version, slug = ?slug, "Buffering change...");
254                match cursor.read(&slug).await? {
255                    Some(file) => content_change_buffer.add(slug, file)?,
256                    None => content_change_buffer.remove(&slug)?,
257                }
258
259                if content_change_buffer.is_full() {
260                    content_change_buffer.flush_to_writer(&self.writer).await?;
261                }
262            }
263        }
264
265        content_change_buffer.flush_to_writer(&self.writer).await?;
266
267        let petname_change_stream =
268            SphereWalker::from(&self.context).into_petname_change_stream(Some(since));
269        let mut petname_change_buffer = ChangeBuffer::new(PETNAME_CHANGE_BUFFER_CAPACITY);
270
271        tokio::pin!(petname_change_stream);
272
273        while let Some((version, changes)) = petname_change_stream.try_next().await? {
274            cursor.mount_at(&version).await?;
275
276            for petname in changes {
277                match cursor.get_petname(&petname).await? {
278                    Some(identity) => match cursor.get_petname_record(&petname).await? {
279                        Some(link_record) => {
280                            if let Some(version) = link_record.get_link() {
281                                petname_change_buffer
282                                    .add(petname.clone(), (identity.clone(), Cid::from(version)))?;
283
284                                let mut petname_path = self.petname_path.clone();
285                                petname_path.push(petname);
286                                self.job_queue
287                                    .send(SphereRenderRequest(
288                                        petname_path,
289                                        identity,
290                                        Cid::from(version),
291                                        link_record,
292                                    ))
293                                    .await?;
294                            } else {
295                                petname_change_buffer.remove(&petname)?;
296                            }
297                        }
298                        None => petname_change_buffer.remove(&petname)?,
299                    },
300                    None => petname_change_buffer.remove(&petname)?,
301                }
302
303                if petname_change_buffer.is_full() {
304                    petname_change_buffer.flush_to_writer(&self.writer).await?;
305                }
306            }
307        }
308
309        petname_change_buffer.flush_to_writer(&self.writer).await?;
310
311        // Write out the latest version that was rendered
312        let identity = cursor.identity().await?;
313        let version = cursor.version().await?;
314
315        self.writer
316            .write_identity_and_version(&identity, &version)
317            .await?;
318
319        Ok(())
320    }
321}