Skip to main content

radicle_job/
lib.rs

1//! Radicle "job COB".
2//!
3//! A [`Job`] records results of automated processing of a repository for a
4//! given Git commit, by one or more nodes.
5//!
6//! For any one of these processes there is a corresponding [`Run`]. For
7//! example, nodes that run CI for a repository might add `Run` to a `Job` for a
8//! specific commit. If there are several nodes running CI, they would each add
9//! their own `Run` to the same `Job`.
10//!
11//! # Example
12//!
13//! ```
14//! # use radicle::crypto::Signer;
15//! # use radicle::git::{raw::Repository, Oid};
16//! # use radicle::test;
17//! # use url::Url;
18//! # use uuid::Uuid;
19//! #
20//! # use radicle_job::{Jobs, Run, Runs, Reason};
21//! #
22//! # fn commit(repo: &Repository) -> Oid {
23//! #     let tree = {
24//! #         let tree = repo.treebuilder(None).unwrap();
25//! #         let oid = tree.write().unwrap();
26//! #         repo.find_tree(oid).unwrap()
27//! #     };
28//! #
29//! #     let author = repo.signature().unwrap();
30//! #     repo.commit(None, &author, &author, "Test Commit", &tree, &[])
31//! #         .unwrap()
32//! #         .into()
33//! # }
34//! #
35//! # let test::setup::NodeWithRepo {
36//! #     node: alice, repo, ..
37//! # } = test::setup::NodeWithRepo::default();
38//! # let oid = commit(&repo.backend);
39//! # let repo = (&*repo).clone();
40//! let mut jobs = Jobs::open(repo).unwrap();
41//! #
42//! # let test::setup::NodeWithRepo { node: bob, .. } = test::setup::NodeWithRepo::default();
43//! let mut job = jobs.create(oid, &alice.signer).unwrap();
44//! #
45//! let uuid = Uuid::new_v4();
46//! let log = Url::parse(&format!("https://example.com/ci/logs?run={}", uuid)).unwrap();
47//! job.run(uuid, log, &alice.signer).unwrap();
48//! ```
49
50#![deny(missing_docs)]
51
52use std::collections::HashMap;
53use std::fmt;
54use std::ops::{Deref, DerefMut};
55use std::str::FromStr;
56
57use indexmap::IndexMap;
58use once_cell::sync::Lazy;
59use radicle::cob::store::Cob;
60use radicle::cob::{self, store, EntryId, Evaluate, ObjectId, Op, TypeName};
61use radicle::crypto;
62use radicle::crypto::signature::Signer;
63use radicle::node::device::Device;
64use radicle::node::NodeId;
65use radicle::prelude::ReadRepository;
66use radicle::storage::{RepositoryError, SignRepository, WriteRepository};
67use radicle::{cob::store::CobAction, git::Oid};
68use serde::{Deserialize, Serialize};
69use url::Url;
70use uuid::Uuid;
71
72pub mod display;
73pub mod error;
74
75/// Type name of a patch.
76pub static TYPENAME: Lazy<TypeName> =
77    Lazy::new(|| FromStr::from_str("xyz.radworks.job").expect("type name is valid"));
78
79/// The identifier for a given [`Job`] collaborative object.
80///
81/// When a [`Job`] is created, through [`Jobs::create`], the identifier is
82/// also returned as part of [`JobMut::id`].
83///
84/// Identifiers can be used to retrieve a [`Job`] or [`JobMut`] through
85/// [`Jobs::get`] and [`Jobs::get_mut`], respectively.
86#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
87pub struct JobId(ObjectId);
88
89impl JobId {
90    fn as_object_id(&self) -> &ObjectId {
91        &self.0
92    }
93}
94
95impl fmt::Display for JobId {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.write_str(&self.0.to_string())
98    }
99}
100
101impl FromStr for JobId {
102    type Err = <ObjectId as FromStr>::Err;
103
104    fn from_str(s: &str) -> Result<Self, Self::Err> {
105        ObjectId::from_str(s).map(Self)
106    }
107}
108
109impl From<JobId> for ObjectId {
110    fn from(JobId(oid): JobId) -> Self {
111        oid
112    }
113}
114
115impl From<ObjectId> for JobId {
116    fn from(oid: ObjectId) -> Self {
117        Self(oid)
118    }
119}
120
121/// A `Job` describes a generic task run for a given commit [`Oid`] by a set of
122/// nodes.
123///
124/// A node may run the task many times, which are accumulated in its [`Runs`]
125/// set. Each [`Run`] is identified by a [`Uuid`].
126#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
127pub struct Job {
128    oid: Oid,
129    runs: HashMap<NodeId, Runs>,
130}
131
132/// A set of [`Run`]s identified by a [`Uuid`].
133///
134/// Iteration over this set is guaranteed to come in insertion order.
135#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
136pub struct Runs(IndexMap<Uuid, Run>);
137
138impl Runs {
139    /// Insert a new [`Run`], identified by the given [`Uuid`].
140    pub fn insert(&mut self, uuid: Uuid, run: Run) -> Option<Run> {
141        self.0.insert(uuid, run)
142    }
143
144    /// Check that the set of runs contains the given [`Uuid`].
145    pub fn contains_key(&self, uuid: &Uuid) -> bool {
146        self.0.contains_key(uuid)
147    }
148
149    /// Get the [`Run`] identifier by the given [`Uuid`].
150    ///
151    /// Return `None` if the `Uuid` does not exist.`
152    pub fn get(&self, uuid: &Uuid) -> Option<&Run> {
153        self.0.get(uuid)
154    }
155
156    /// Get the `nth` [`Run`] of the set.
157    pub fn get_index(&self, nth: usize) -> Option<(&Uuid, &Run)> {
158        self.0.get_index(nth)
159    }
160
161    /// Get the latest [`Run`] and its corresponding [`Uuid`].
162    pub fn latest(&self) -> Option<(&Uuid, &Run)> {
163        self.0.iter().next_back()
164    }
165
166    /// Get all [`Run`]s that have started [`Status`].
167    pub fn started(&self) -> Runs {
168        self.iter()
169            .filter_map(|(uuid, run)| run.is_started().then_some((*uuid, run.clone())))
170            .collect()
171    }
172
173    /// Get all [`Run`]s that have finished [`Status`].
174    pub fn finished(&self) -> Runs {
175        self.iter()
176            .filter_map(|(uuid, run)| run.is_finished().then_some((*uuid, run.clone())))
177            .collect()
178    }
179
180    /// Get all [`Run`]s that have finished [`Status`] and have the succeeded
181    /// [`Reason`].
182    pub fn succeeded(&self) -> Runs {
183        self.iter()
184            .filter_map(|(uuid, run)| run.succeeded().then_some((*uuid, run.clone())))
185            .collect()
186    }
187
188    /// Get all [`Run`]s that have finished [`Status`] and have the failed
189    /// [`Reason`].
190    pub fn failed(&self) -> Runs {
191        self.iter()
192            .filter_map(|(uuid, run)| run.failed().then_some((*uuid, run.clone())))
193            .collect()
194    }
195
196    /// Partition all [`Run`]s into started, succeeded, and failed.
197    pub fn partition(&self) -> (Runs, Runs, Runs) {
198        let mut started = IndexMap::new();
199        let mut succeeded = IndexMap::new();
200        let mut failed = IndexMap::new();
201
202        for (uuid, run) in self.0.iter() {
203            match run.status {
204                Status::Started => started.insert(*uuid, run.clone()),
205                Status::Finished(Reason::Succeeded) => succeeded.insert(*uuid, run.clone()),
206                Status::Finished(Reason::Failed) => failed.insert(*uuid, run.clone()),
207            };
208        }
209        (Runs(started), Runs(succeeded), Runs(failed))
210    }
211
212    /// Check is the set of [`Runs`] empty.
213    pub fn is_empty(&self) -> bool {
214        self.0.is_empty()
215    }
216
217    /// Get the number of [`Runs`] in the set.
218    pub fn len(&self) -> usize {
219        self.0.len()
220    }
221
222    /// Iterate over the [`Run`]s and their corresponding [`Uuid`]s.
223    ///
224    /// The order of the iteration is guaranteed to be insertion order.
225    pub fn iter(&self) -> impl Iterator<Item = (&Uuid, &Run)> {
226        self.0.iter()
227    }
228}
229
230impl FromIterator<(Uuid, Run)> for Runs {
231    fn from_iter<T: IntoIterator<Item = (Uuid, Run)>>(iter: T) -> Self {
232        Self(iter.into_iter().collect())
233    }
234}
235
236impl<'a> IntoIterator for &'a Runs {
237    type Item = (&'a Uuid, &'a Run);
238    type IntoIter = indexmap::map::Iter<'a, Uuid, Run>;
239
240    fn into_iter(self) -> Self::IntoIter {
241        self.0.iter()
242    }
243}
244
245impl IntoIterator for Runs {
246    type Item = (Uuid, Run);
247    type IntoIter = indexmap::map::IntoIter<Uuid, Run>;
248
249    fn into_iter(self) -> Self::IntoIter {
250        self.0.into_iter()
251    }
252}
253
254/// The collaborative object actions that are used for Radicle COB operations.
255#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
256pub enum Action {
257    /// Request a [`Job`] be created for the given [`Oid`].
258    ///
259    /// Note this is expected only once for initializing the [`Job`]. Every
260    /// other `Request` for the same `Oid` will be ignored.
261    Request {
262        /// The commit the [`Job`] corresponds to.
263        oid: Oid,
264    },
265    /// Notify that the node has started a [`Run`].
266    Run {
267        /// The [`Uuid`] that identifies this particular [`Run`] of the node.
268        uuid: Uuid,
269        /// The [`Url`] where the node will log any information or data.
270        log: Url,
271    },
272    /// Notify that the node has finished a [`Run`].
273    Finished {
274        /// The [`Uuid`] that identifies the [`Run`] that is finished.
275        uuid: Uuid,
276        /// The [`Reason`] which the node finished with.
277        reason: Reason,
278    },
279}
280
281impl CobAction for Action {
282    fn parents(&self) -> Vec<radicle::git::Oid> {
283        match self {
284            Action::Request { oid } => vec![*oid],
285            _ => Vec::new(),
286        }
287    }
288}
289
290impl Job {
291    /// Construct a new [`Job`].
292    fn new(oid: Oid) -> Self {
293        Self {
294            oid,
295            runs: HashMap::new(),
296        }
297    }
298
299    /// Get the [`Oid`] this [`Job`] is running tasks for.
300    pub fn oid(&self) -> &Oid {
301        &self.oid
302    }
303
304    /// Get all the nodes that have started, but not finished, [`Runs`].
305    pub fn started(&self) -> HashMap<NodeId, Runs> {
306        self.filter_map_by(|runs| runs.started())
307    }
308
309    /// Get all the nodes that have started, and finished, [`Runs`].
310    pub fn finished(&self) -> HashMap<NodeId, Runs> {
311        self.filter_map_by(|runs| runs.finished())
312    }
313
314    /// Get all the nodes that have succeeded [`Runs`].
315    pub fn succeeded(&self) -> HashMap<NodeId, Runs> {
316        self.filter_map_by(|runs| runs.succeeded())
317    }
318
319    /// Get all the nodes that have failed [`Runs`].
320    pub fn failed(&self) -> HashMap<NodeId, Runs> {
321        self.filter_map_by(|runs| runs.failed())
322    }
323
324    /// Get all nodes' started, succeeded, and failed runs – respectively.
325    pub fn partition(&self) -> HashMap<NodeId, (Runs, Runs, Runs)> {
326        self.runs
327            .iter()
328            .map(|(node, runs)| (*node, runs.partition()))
329            .collect()
330    }
331
332    /// Get the latest [`Run`] of the given [`NodeId`].
333    pub fn latest_of(&self, node: &NodeId) -> Option<(&Uuid, &Run)> {
334        self.runs
335            .get(node)
336            .and_then(|runs| runs.0.iter().next_back())
337    }
338
339    /// Get the latest [`Run`]s of all [`NodeId`]s.
340    pub fn latest(&self) -> impl Iterator<Item = (&NodeId, &Uuid, &Run)> + '_ {
341        self.runs
342            .iter()
343            .filter_map(|(node, runs)| runs.latest().map(|(uuid, run)| (node, uuid, run)))
344    }
345
346    /// Get the raw `HashMap` of the node runs.
347    pub fn runs(&self) -> &HashMap<NodeId, Runs> {
348        &self.runs
349    }
350
351    /// Get the [`Runs`] of a given [`NodeId`].
352    pub fn runs_of(&self, node: &NodeId) -> Option<&Runs> {
353        self.runs.get(node)
354    }
355
356    fn filter_map_by<P>(&self, p: P) -> HashMap<NodeId, Runs>
357    where
358        P: Fn(&Runs) -> Runs,
359    {
360        self.runs
361            .iter()
362            .filter_map(|(node, runs)| {
363                let runs = p(runs);
364                (!runs.is_empty()).then_some((*node, runs))
365            })
366            .collect()
367    }
368
369    fn insert(&mut self, node: NodeId, uuid: Uuid, run: Run) -> bool {
370        let runs = self.runs.entry(node).or_default();
371        if runs.contains_key(&uuid) {
372            false
373        } else {
374            runs.insert(uuid, run);
375            true
376        }
377    }
378
379    fn update(
380        &mut self,
381        node: NodeId,
382        uuid: Uuid,
383        reason: Reason,
384        timestamp: cob::Timestamp,
385    ) -> bool {
386        let Some(runs) = self.runs.get_mut(&node) else {
387            return false;
388        };
389        let mut updated = false;
390        runs.0.entry(uuid).and_modify(|run| {
391            updated = true;
392            *run = run.clone().finish(reason, timestamp);
393        });
394        updated
395    }
396
397    fn action(&mut self, node: NodeId, action: Action, timestamp: cob::Timestamp) {
398        match action {
399            // Cannot request for another `oid`, so we ignore any superfluous
400            // request actions
401            Action::Request { .. } => {}
402            Action::Run { uuid, log } => {
403                self.insert(node, uuid, Run::new(log, timestamp));
404            }
405            Action::Finished { uuid, reason } => {
406                self.update(node, uuid, reason, timestamp);
407            }
408        }
409    }
410}
411
412impl store::CobWithType for Job {
413    fn type_name() -> &'static TypeName {
414        &TYPENAME
415    }
416}
417
418impl store::Cob for Job {
419    type Action = Action;
420    type Error = error::Build;
421
422    fn from_root<R: ReadRepository>(op: Op<Self::Action>, repo: &R) -> Result<Self, Self::Error> {
423        let mut actions = op.actions.into_iter();
424        let Some(Action::Request { oid }) = actions.next() else {
425            return Err(error::Build::Initial);
426        };
427        repo.commit(oid)
428            .map_err(|err| error::Build::MissingCommit { oid, err })?;
429        let mut runs = Self::new(oid);
430        for action in actions {
431            runs.action(op.author, action, op.timestamp);
432        }
433        Ok(runs)
434    }
435
436    fn op<'a, R: ReadRepository, I: IntoIterator<Item = &'a radicle::cob::Entry>>(
437        &mut self,
438        op: Op<Self::Action>,
439        _concurrent: I,
440        _repo: &R,
441    ) -> Result<(), Self::Error> {
442        for action in op.actions {
443            self.action(op.author, action, op.timestamp);
444        }
445        Ok(())
446    }
447}
448
449impl<R: ReadRepository> Evaluate<R> for Job {
450    type Error = error::Apply;
451
452    fn init(entry: &radicle::cob::Entry, store: &R) -> Result<Self, Self::Error> {
453        let op = Op::try_from(entry)?;
454        let object = Job::from_root(op, store)?;
455        Ok(object)
456    }
457
458    fn apply<'a, I: Iterator<Item = (&'a Oid, &'a radicle::cob::Entry)>>(
459        &mut self,
460        entry: &radicle::cob::Entry,
461        concurrent: I,
462        store: &R,
463    ) -> Result<(), Self::Error> {
464        let op = Op::try_from(entry)?;
465        self.op(op, concurrent.map(|(_, e)| e), store)
466            .map_err(error::Apply::from)
467    }
468}
469
470/// A `Run` represents a task run for a [`Job`]. A `Run` is initially created in
471/// with a [`Status::Started`], before processing has started.
472///
473/// The `Run` can then be marked as [`Status::Finished`] with a given
474/// [`Reason`].
475///
476/// The `Run` also contains a [`Url`] so that any extra metadata, for example
477/// logs, can be tracked outside of the `Run` itself.
478#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
479pub struct Run {
480    /// The status of the run.
481    ///
482    /// A run can be in one of three states:
483    ///   - Started
484    ///   - Succeeded – implies that it has finished
485    ///   - Failed – implies that it has finished
486    status: Status,
487    /// The [`Url`] of the [`Run`] where information is logged by the node.
488    log: Url,
489    /// The timestamp of the last operation to affect the run.
490    timestamp: cob::Timestamp,
491}
492
493impl Run {
494    /// Create a new `Run` with a [`Url`] that ideally points to the log of that
495    /// process.
496    pub fn new(log: Url, timestamp: cob::Timestamp) -> Self {
497        Self {
498            status: Status::Started,
499            log,
500            timestamp,
501        }
502    }
503
504    /// Mark the `Run` as finished.
505    fn finish(self, reason: Reason, timestamp: cob::Timestamp) -> Self {
506        Self {
507            status: Status::Finished(reason),
508            log: self.log,
509            timestamp,
510        }
511    }
512
513    /// Return URL for the log of this run.
514    pub fn log(&self) -> &Url {
515        &self.log
516    }
517
518    /// Return latest [`Status`] of the `Run`.
519    pub fn status(&self) -> &Status {
520        &self.status
521    }
522
523    /// Return the timestamp of the last update to the `Run`.
524    pub fn timestamp(&self) -> &cob::Timestamp {
525        &self.timestamp
526    }
527
528    /// Returns `true` if the status of the `Run` is [`Status::Started`].
529    pub fn is_started(&self) -> bool {
530        match self.status {
531            Status::Started => true,
532            Status::Finished(_) => false,
533        }
534    }
535
536    /// Returns `true` if the status of the `Run` is [`Status::Finished`].
537    pub fn is_finished(&self) -> bool {
538        !self.is_started()
539    }
540
541    /// Returns `true` if the status of the `Run` is [`Status::Finished`] and
542    /// the reason for finishing is [`Reason::Succeeded`].
543    pub fn succeeded(&self) -> bool {
544        match self.status {
545            Status::Started => false,
546            Status::Finished(Reason::Failed) => false,
547            Status::Finished(Reason::Succeeded) => true,
548        }
549    }
550
551    /// Returns `true` if the status of the `Run` is [`Status::Finished`] and
552    /// the reason for finishing is [`Reason::Failed`].
553    pub fn failed(&self) -> bool {
554        match self.status {
555            Status::Started => false,
556            Status::Finished(Reason::Failed) => true,
557            Status::Finished(Reason::Succeeded) => false,
558        }
559    }
560}
561
562/// The status of a [`Run`].
563#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
564pub enum Status {
565    /// The [`Run`] has started.
566    Started,
567    /// The [`Run`] has finished with the given [`Reason`].
568    Finished(Reason),
569}
570
571/// The reason for a [`Status`] to have finished.
572#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
573pub enum Reason {
574    /// The [`Run`] finished and failed.
575    Failed,
576    /// The [`Run`] finished and succeeded.
577    Succeeded,
578}
579
580/// The storage for all [`Job`] items.
581///
582/// To get a handle for [`Jobs`] use [`Jobs::open`].
583///
584/// The read-only operations for [`Jobs`] are:
585///
586///   - [`Jobs::counts`]
587///   - [`Jobs::get`]
588///
589/// The write operations for [`Jobs`] are:
590///
591///   - [`Jobs::create`]
592///   - [`Jobs::get_mut`]
593pub struct Jobs<'a, R> {
594    raw: store::Store<'a, Job, R>,
595}
596
597impl<'a, R> Deref for Jobs<'a, R> {
598    type Target = store::Store<'a, Job, R>;
599
600    fn deref(&self) -> &Self::Target {
601        &self.raw
602    }
603}
604
605impl<'a, R> Jobs<'a, R>
606where
607    R: ReadRepository + cob::Store<Namespace = NodeId>,
608{
609    /// Open a jobs store.
610    pub fn open(repository: &'a R) -> Result<Self, RepositoryError> {
611        let identity = repository.identity_head()?;
612        let raw = store::Store::open(repository)?.identity(identity);
613
614        Ok(Self { raw })
615    }
616
617    /// Return the number of [`Job`]s in the store.
618    pub fn counts(&self) -> Result<usize, store::Error> {
619        Ok(self.all()?.count())
620    }
621
622    /// Get a [`Job`], given its [`JobId`] identifier.
623    pub fn get(&self, id: &JobId) -> Result<Option<Job>, store::Error> {
624        self.raw.get(id.as_object_id())
625    }
626
627    /// Find the [`Job`]s that are associated with the `wanted` commit.
628    pub fn find_by_commit(&self, wanted: Oid) -> Result<FindByCommit<'a>, store::Error> {
629        FindByCommit::new(self, wanted)
630    }
631}
632
633/// [`Iterator`] for finding each [`Job`] where the [`Job::oid`] matches the
634/// wanted commit. See [`Jobs::find_by_commit`].
635pub struct FindByCommit<'a> {
636    jobs: Box<dyn Iterator<Item = Result<(ObjectId, Job), cob::store::Error>> + 'a>,
637    needle: Oid,
638}
639
640impl<'a> FindByCommit<'a> {
641    fn new<R>(jobs: &Jobs<'a, R>, needle: Oid) -> Result<Self, cob::store::Error>
642    where
643        R: ReadRepository + cob::Store<Namespace = NodeId>,
644    {
645        Ok(Self {
646            jobs: Box::new(jobs.all()?),
647            needle,
648        })
649    }
650
651    fn wanted(&self, job: &Job) -> bool {
652        self.needle == *job.oid()
653    }
654}
655
656impl Iterator for FindByCommit<'_> {
657    type Item = Result<(JobId, Job), cob::store::Error>;
658
659    fn next(&mut self) -> Option<Self::Item> {
660        loop {
661            let job = self.jobs.next()?;
662            match job {
663                Ok((id, job)) if self.wanted(&job) => return Some(Ok((JobId::from(id), job))),
664                Ok(_) => continue,
665                Err(err) => return Some(Err(err)),
666            }
667        }
668    }
669}
670
671impl<'a, R> Jobs<'a, R>
672where
673    R: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
674{
675    /// Get a [`JobMut`], given its [`JobId`] identifier.
676    pub fn get_mut<'g>(&'g mut self, id: &JobId) -> Result<JobMut<'a, 'g, R>, store::Error> {
677        let job = self
678            .raw
679            .get(id.as_object_id())?
680            .ok_or_else(move || store::Error::NotFound(TYPENAME.clone(), (*id).into()))?;
681
682        Ok(JobMut {
683            id: *id,
684            job,
685            store: self,
686        })
687    }
688
689    /// Create a new [`Job`] in the repository.
690    pub fn create<'g, G>(
691        &'g mut self,
692        oid: Oid,
693        signer: &Device<G>,
694    ) -> Result<JobMut<'a, 'g, R>, store::Error>
695    where
696        G: Signer<crypto::Signature>,
697    {
698        let (id, job) = store::Transaction::initial::<_, _, Transaction<R>>(
699            "Request job",
700            &mut self.raw,
701            signer,
702            |tx, _| {
703                tx.request(oid)?;
704                Ok(())
705            },
706        )?;
707
708        Ok(JobMut {
709            id: id.into(),
710            job,
711            store: self,
712        })
713    }
714}
715
716/// A `JobMut` is a [`Job`] where the underlying `Job` can be mutated by
717/// applying actions to it.
718pub struct JobMut<'a, 'g, R> {
719    /// Git object that the [`Job`] applies to.
720    pub id: JobId,
721
722    job: Job,
723    store: &'g mut Jobs<'a, R>,
724}
725
726impl<R> Deref for JobMut<'_, '_, R> {
727    type Target = Job;
728
729    fn deref(&self) -> &Self::Target {
730        &self.job
731    }
732}
733
734impl<'a, 'g, R> JobMut<'a, 'g, R>
735where
736    R: WriteRepository + cob::Store<Namespace = NodeId>,
737{
738    /// Create a new `JobMut`.
739    pub fn new(id: JobId, job: Job, store: &'g mut Jobs<'a, R>) -> Self {
740        Self { id, job, store }
741    }
742
743    /// The COB identifier for the underlying [`Job`].
744    pub fn id(&self) -> &JobId {
745        &self.id
746    }
747
748    /// Reload the [`Job`] data from underlying storage.
749    pub fn reload(&mut self) -> Result<(), store::Error> {
750        self.job = self
751            .store
752            .get(&self.id)?
753            .ok_or_else(|| store::Error::NotFound(TYPENAME.clone(), *self.id.as_object_id()))?;
754
755        Ok(())
756    }
757
758    /// Start a new [`Run`] for the node, where the run is identified by the
759    /// given [`Uuid`].
760    pub fn run<G>(
761        &mut self,
762        uuid: Uuid,
763        log: Url,
764        signer: &Device<G>,
765    ) -> Result<EntryId, store::Error>
766    where
767        G: Signer<crypto::Signature>,
768    {
769        self.transaction("Run node job", signer, |tx| tx.run(uuid, log))
770    }
771
772    /// Finish a [`Run`], identified by the given [`Uuid`], for the node, with
773    /// the provided [`Reason`].
774    pub fn finish<G>(
775        &mut self,
776        uuid: Uuid,
777        reason: Reason,
778        signer: &Device<G>,
779    ) -> Result<EntryId, store::Error>
780    where
781        G: Signer<crypto::Signature>,
782    {
783        self.transaction("Finished node job", signer, |tx| tx.finish(uuid, reason))
784    }
785
786    /// Apply COB operations to a `JobMut`.
787    fn transaction<G, F>(
788        &mut self,
789        message: &str,
790        signer: &Device<G>,
791        operations: F,
792    ) -> Result<EntryId, store::Error>
793    where
794        G: Signer<crypto::Signature>,
795        F: FnOnce(&mut Transaction<R>) -> Result<(), store::Error>,
796    {
797        let mut tx = Transaction::default();
798        operations(&mut tx)?;
799
800        let (job, commit) =
801            tx.0.commit(message, self.id.into(), &mut self.store.raw, signer)?;
802        self.job = job;
803
804        Ok(commit)
805    }
806}
807
808/// An update for the `Job` COB. This applies a change to the
809/// in-memory computed representation of the COB.
810struct Transaction<R: ReadRepository>(store::Transaction<Job, R>);
811
812impl<R> From<store::Transaction<Job, R>> for Transaction<R>
813where
814    R: ReadRepository,
815{
816    fn from(tx: store::Transaction<Job, R>) -> Self {
817        Self(tx)
818    }
819}
820
821impl<R> From<Transaction<R>> for store::Transaction<Job, R>
822where
823    R: ReadRepository,
824{
825    fn from(Transaction(tx): Transaction<R>) -> Self {
826        tx
827    }
828}
829
830impl<R> Default for Transaction<R>
831where
832    R: ReadRepository,
833{
834    fn default() -> Self {
835        Self(Default::default())
836    }
837}
838
839impl<R> Deref for Transaction<R>
840where
841    R: ReadRepository,
842{
843    type Target = store::Transaction<Job, R>;
844
845    fn deref(&self) -> &Self::Target {
846        &self.0
847    }
848}
849
850impl<R> DerefMut for Transaction<R>
851where
852    R: ReadRepository,
853{
854    fn deref_mut(&mut self) -> &mut Self::Target {
855        &mut self.0
856    }
857}
858
859impl<R> Transaction<R>
860where
861    R: ReadRepository,
862{
863    /// Add a request operation to the transaction.
864    fn request(&mut self, oid: Oid) -> Result<(), store::Error> {
865        self.0.push(Action::Request { oid })
866    }
867
868    /// Add a new `Run` to a transaction.
869    fn run(&mut self, uuid: Uuid, log: Url) -> Result<(), store::Error> {
870        self.0.push(Action::Run { uuid, log })
871    }
872
873    /// Mark a run as finished.
874    fn finish(&mut self, uuid: Uuid, reason: Reason) -> Result<(), store::Error> {
875        self.0.push(Action::Finished { uuid, reason })
876    }
877}
878
879#[cfg(test)]
880#[allow(clippy::unwrap_used)]
881mod test {
882    use radicle::git::{raw::Repository, Oid};
883    use radicle::test;
884    use url::Url;
885    use uuid::Uuid;
886
887    use crate::{Jobs, Reason, Run, Runs, Status};
888
889    fn node_run() -> (Uuid, Url) {
890        let uuid = Uuid::new_v4();
891        let log = Url::parse(&format!("https://example.com/ci/logs?run={uuid}")).unwrap();
892        (uuid, log)
893    }
894
895    fn commit(repo: &Repository) -> Oid {
896        let tree = {
897            let tree = repo.treebuilder(None).unwrap();
898            let oid = tree.write().unwrap();
899            repo.find_tree(oid).unwrap()
900        };
901
902        let author = repo.signature().unwrap();
903        repo.commit(None, &author, &author, "Test Commit", &tree, &[])
904            .unwrap()
905            .into()
906    }
907
908    #[test]
909    fn e2e() {
910        let test::setup::NodeWithRepo {
911            node: alice, repo, ..
912        } = test::setup::NodeWithRepo::default();
913        let oid = commit(&repo.backend);
914        let mut jobs = Jobs::open(&*repo).unwrap();
915
916        let test::setup::NodeWithRepo { node: bob, .. } = test::setup::NodeWithRepo::default();
917        let mut job = jobs.create(oid, &alice.signer).unwrap();
918
919        let (alice_uuid, alice_log) = node_run();
920        job.run(alice_uuid, alice_log.clone(), &alice.signer)
921            .unwrap();
922
923        let (bob_uuid, bob_log) = node_run();
924        job.run(bob_uuid, bob_log.clone(), &bob.signer).unwrap();
925
926        let alice_runs = job.runs_of(alice.signer.public_key()).unwrap();
927        assert!(alice_runs.contains_key(&alice_uuid));
928        let run = alice_runs.get(&alice_uuid).unwrap();
929        assert_eq!(run.status, Status::Started);
930        assert_eq!(run.log, alice_log);
931
932        let bob_runs = job.runs_of(bob.signer.public_key()).unwrap();
933        assert!(bob_runs.contains_key(&bob_uuid));
934        let run = bob_runs.get(&bob_uuid).unwrap();
935        assert_eq!(run.status, Status::Started);
936        assert_eq!(run.log, bob_log);
937
938        job.finish(alice_uuid, Reason::Succeeded, &alice.signer)
939            .unwrap();
940
941        let finished = job.finished();
942        assert!(finished.contains_key(alice.signer.public_key()));
943        assert!(!finished.contains_key(bob.signer.public_key()));
944
945        job.finish(bob_uuid, Reason::Failed, &bob.signer).unwrap();
946
947        let succeeded = job.succeeded();
948        assert!(succeeded.contains_key(alice.signer.public_key()));
949        assert!(!succeeded.contains_key(bob.signer.public_key()));
950        let failed = job.failed();
951        assert!(!failed.contains_key(alice.signer.public_key()));
952        assert!(failed.contains_key(bob.signer.public_key()));
953        let started = job.started();
954        assert!(started.is_empty());
955    }
956
957    #[test]
958    fn missing_commit() {
959        let test::setup::NodeWithRepo {
960            node: alice, repo, ..
961        } = test::setup::NodeWithRepo::default();
962        let mut jobs = Jobs::open(&*repo).unwrap();
963        let oid = test::arbitrary::oid();
964        let job = jobs.create(oid, &alice.signer);
965        assert!(job.is_err())
966    }
967
968    #[test]
969    fn idempotent_create() {
970        let test::setup::NodeWithRepo {
971            node: alice, repo, ..
972        } = test::setup::NodeWithRepo::default();
973        let oid = commit(&repo.backend);
974        let mut jobs = Jobs::open(&*repo).unwrap();
975        let job1 = {
976            let job1 = jobs.create(oid, &alice.signer).unwrap();
977            job1.id
978        };
979        let job2 = {
980            let job2 = jobs.create(oid, &alice.signer).unwrap();
981            job2.id
982        };
983
984        assert_eq!(job1, job2);
985        assert_eq!(jobs.get(&job1).unwrap(), jobs.get(&job2).unwrap());
986    }
987
988    #[test]
989    fn runs_insertion_order_iteration() {
990        let mut runs = Runs::default();
991        let uuids = (0..10).map(|_| Uuid::new_v4()).collect::<Vec<_>>();
992        for uuid in &uuids {
993            runs.insert(
994                *uuid,
995                Run {
996                    status: Status::Started,
997                    log: Url::parse("https://example.com/ci/logs").unwrap(),
998                    timestamp: radicle::cob::Timestamp::from_secs(1358182),
999                },
1000            );
1001        }
1002
1003        assert_eq!(
1004            uuids,
1005            runs.iter()
1006                .map(|(uuid, _)| uuid)
1007                .copied()
1008                .collect::<Vec<_>>()
1009        )
1010    }
1011}