graft_kernel/rt/
runtime.rs

1use std::{sync::Arc, time::Duration};
2
3use bytestring::ByteString;
4use culprit::ResultExt;
5use graft_core::{
6    PageCount, PageIdx, SegmentId, VolumeId, checksum::Checksum, commit::SegmentIdx, lsn::LSN,
7    page::Page, pageset::PageSet,
8};
9use tracing::Instrument;
10use tryiter::TryIteratorExt;
11
12use crate::{
13    KernelErr,
14    graft::{Graft, GraftStatus},
15    graft_reader::GraftReader,
16    graft_writer::GraftWriter,
17    remote::Remote,
18    rt::{
19        action::{Action, FetchSegment, FetchVolume, HydrateSnapshot, RemoteCommit},
20        task::{autosync::AutosyncTask, supervise},
21    },
22    snapshot::Snapshot,
23};
24
25use crate::local::fjall_storage::FjallStorage;
26
27type Result<T> = culprit::Result<T, KernelErr>;
28
29#[derive(Clone, Debug)]
30pub struct Runtime {
31    inner: Arc<RuntimeInner>,
32}
33
34#[derive(Debug)]
35struct RuntimeInner {
36    tokio: tokio::runtime::Handle,
37    storage: Arc<FjallStorage>,
38    remote: Arc<Remote>,
39}
40
41impl Runtime {
42    /// Create a Graft `Runtime` wrapping the provided Tokio runtime handle.
43    pub fn new(
44        tokio_rt: tokio::runtime::Handle,
45        remote: Arc<Remote>,
46        storage: Arc<FjallStorage>,
47        autosync: Option<Duration>,
48    ) -> Runtime {
49        // spin up background tasks as needed
50        if let Some(interval) = autosync {
51            let _guard = tokio_rt.enter();
52            let mut ticker = tokio::time::interval(interval);
53            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
54            tokio_rt.spawn(supervise(
55                storage.clone(),
56                remote.clone(),
57                AutosyncTask::new(ticker),
58            ));
59        }
60        Runtime {
61            inner: Arc::new(RuntimeInner { tokio: tokio_rt, storage, remote }),
62        }
63    }
64
65    pub(crate) fn storage(&self) -> &FjallStorage {
66        &self.inner.storage
67    }
68
69    pub(crate) fn create_staged_segment(&self) -> SegmentIdx {
70        // TODO: need to keep track of staged segments in memory to prevent the GC from clearing them
71        SegmentIdx::new(SegmentId::random(), PageSet::default())
72    }
73
74    pub(crate) fn read_page(&self, snapshot: &Snapshot, pageidx: PageIdx) -> Result<Page> {
75        let reader = self.storage().read();
76        if let Some(commit) = reader.search_page(snapshot, pageidx).or_into_ctx()? {
77            let idx = commit
78                .segment_idx()
79                .expect("BUG: commit claims to contain pageidx");
80
81            if let Some(page) = reader.read_page(idx.sid().clone(), pageidx).or_into_ctx()? {
82                return Ok(page);
83            }
84
85            // fallthrough to loading the page from the remote
86            let range = idx
87                .frame_for_pageidx(pageidx)
88                .expect("BUG: no frame for pageidx");
89
90            // fetch the segment frame containing the page
91            self.run_action(FetchSegment { range })?;
92
93            // now that we've fetched the segment, read the page again using a
94            // fresh storage reader
95            Ok(self
96                .storage()
97                .read()
98                .read_page(idx.sid.clone(), pageidx)
99                .or_into_ctx()?
100                .expect("BUG: page not found after fetching"))
101        } else {
102            Ok(Page::EMPTY)
103        }
104    }
105
106    fn run_action<A: Action>(&self, action: A) -> Result<()> {
107        let span = tracing::debug_span!("Action::run", ?action);
108
109        self.inner.tokio.block_on(
110            action
111                .run(&self.inner.storage, &self.inner.remote)
112                .instrument(span),
113        )
114    }
115}
116
117// tag methods
118impl Runtime {
119    pub fn tag_iter(&self) -> impl Iterator<Item = Result<(ByteString, VolumeId)>> {
120        self.storage()
121            .read()
122            .iter_tags()
123            .map_err(|err| err.map_ctx(KernelErr::from))
124    }
125
126    pub fn tag_exists(&self, name: &str) -> Result<bool> {
127        self.storage().read().tag_exists(name).or_into_ctx()
128    }
129
130    pub fn tag_get(&self, tag: &str) -> Result<Option<VolumeId>> {
131        self.storage().read().get_tag(tag).or_into_ctx()
132    }
133
134    /// retrieves the `VolumeId` for a tag, replacing it with the provided graft
135    pub fn tag_replace(&self, tag: &str, graft: VolumeId) -> Result<Option<VolumeId>> {
136        self.storage()
137            .read_write()
138            .tag_replace(tag, graft)
139            .or_into_ctx()
140    }
141
142    pub fn tag_delete(&self, tag: &str) -> Result<()> {
143        self.storage().tag_delete(tag).or_into_ctx()
144    }
145}
146
147// graft methods
148impl Runtime {
149    pub fn graft_iter(&self) -> impl Iterator<Item = Result<Graft>> {
150        self.storage()
151            .read()
152            .iter_grafts()
153            .map_err(|err| err.map_ctx(KernelErr::from))
154    }
155
156    pub fn graft_exists(&self, graft: &VolumeId) -> Result<bool> {
157        self.storage().read().graft_exists(graft).or_into_ctx()
158    }
159
160    /// opens a graft. if either the graft's `VolumeId` or the remote's `VolumeId`
161    /// are missing, they will be randomly generated. If the graft already
162    /// exists, this function will fail if its remote doesn't match.
163    pub fn graft_open(&self, graft: Option<VolumeId>, remote: Option<VolumeId>) -> Result<Graft> {
164        self.storage()
165            .read_write()
166            .graft_open(graft, remote)
167            .or_into_ctx()
168    }
169
170    /// creates a new graft by forking an existing snapshot
171    pub fn graft_from_snapshot(&self, snapshot: &Snapshot) -> Result<Graft> {
172        self.storage().graft_from_snapshot(snapshot).or_into_ctx()
173    }
174
175    /// retrieves an existing graft. returns `LogicalErr::GraftNotFound` if missing
176    pub fn graft_get(&self, graft: &VolumeId) -> Result<Graft> {
177        self.storage().read().graft(graft).or_into_ctx()
178    }
179
180    /// removes a graft but leaves the underlying volumes in place
181    pub fn graft_delete(&self, graft: &VolumeId) -> Result<()> {
182        self.storage().graft_delete(graft).or_into_ctx()
183    }
184
185    /// fetches the latest changes to the remote and then pulls them into the
186    /// graft
187    pub fn graft_pull(&self, graft: VolumeId) -> Result<()> {
188        let graft = self.inner.storage.read().graft(&graft).or_into_ctx()?;
189        self.fetch_volume(graft.remote, None)?;
190        self.storage()
191            .read_write()
192            .sync_remote_to_local(graft.local)
193            .or_into_ctx()
194    }
195
196    pub fn graft_push(&self, graft: VolumeId) -> Result<()> {
197        self.run_action(RemoteCommit { graft })
198    }
199
200    pub fn graft_status(&self, graft: &VolumeId) -> Result<GraftStatus> {
201        let reader = self.storage().read();
202        let state = reader.graft(graft).or_into_ctx()?;
203        let latest_local = reader.latest_lsn(&state.local).or_into_ctx()?;
204        let latest_remote = reader.latest_lsn(&state.remote).or_into_ctx()?;
205        Ok(state.status(latest_local, latest_remote))
206    }
207
208    pub fn graft_snapshot(&self, graft: &VolumeId) -> Result<Snapshot> {
209        self.storage().read().snapshot(graft).or_into_ctx()
210    }
211
212    pub fn graft_reader(&self, graft: VolumeId) -> Result<GraftReader> {
213        let snapshot = self.graft_snapshot(&graft)?;
214        Ok(GraftReader::new(self.clone(), graft, snapshot))
215    }
216
217    pub fn graft_writer(&self, graft: VolumeId) -> Result<GraftWriter> {
218        let snapshot = self.graft_snapshot(&graft)?;
219        let page_count = self.snapshot_pages(&snapshot)?;
220        Ok(GraftWriter::new(self.clone(), graft, snapshot, page_count))
221    }
222}
223
224// volume methods
225impl Runtime {
226    pub fn fetch_volume(&self, vid: VolumeId, max_lsn: Option<LSN>) -> Result<()> {
227        self.run_action(FetchVolume { vid, max_lsn })
228    }
229}
230
231// snapshot methods
232impl Runtime {
233    /// returns the total number of pages in the snapshot
234    pub fn snapshot_pages(&self, snapshot: &Snapshot) -> Result<PageCount> {
235        if let Some((vid, lsn)) = snapshot.head() {
236            Ok(self
237                .storage()
238                .read()
239                .page_count(vid, lsn)
240                .or_into_ctx()?
241                .expect("BUG: missing head commit for snapshot"))
242        } else {
243            Ok(PageCount::ZERO)
244        }
245    }
246
247    pub fn snapshot_is_latest(&self, graft: &VolumeId, snapshot: &Snapshot) -> Result<bool> {
248        self.storage()
249            .read()
250            .is_latest_snapshot(graft, snapshot)
251            .or_into_ctx()
252    }
253
254    /// returns the checksum of the snapshot
255    pub fn snapshot_checksum(&self, snapshot: &Snapshot) -> Result<Checksum> {
256        self.storage().read().checksum(snapshot).or_into_ctx()
257    }
258
259    pub fn snapshot_missing_pages(&self, snapshot: &Snapshot) -> Result<PageSet> {
260        let missing_frames = self
261            .storage()
262            .read()
263            .find_missing_frames(snapshot)
264            .or_into_ctx()?;
265        // merge missing_frames into a single PageSet
266        Ok(missing_frames
267            .into_iter()
268            .fold(PageSet::EMPTY, |mut pageset, frame| {
269                pageset |= frame.pageset;
270                pageset
271            }))
272    }
273
274    pub fn snapshot_hydrate(&self, snapshot: Snapshot) -> Result<()> {
275        self.run_action(HydrateSnapshot { snapshot })
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use std::{sync::Arc, time::Duration};
282
283    use graft_core::{PageIdx, VolumeId, page::Page};
284    use tokio::time::sleep;
285
286    use crate::{
287        graft_reader::GraftRead, graft_writer::GraftWrite, local::fjall_storage::FjallStorage,
288        remote::RemoteConfig, rt::runtime::Runtime,
289    };
290
291    #[graft_test::test]
292    fn runtime_sanity() {
293        let tokio_rt = tokio::runtime::Builder::new_current_thread()
294            .start_paused(true)
295            .enable_all()
296            .build()
297            .unwrap();
298
299        let remote = Arc::new(RemoteConfig::Memory.build().unwrap());
300        let storage = Arc::new(FjallStorage::open_temporary().unwrap());
301        let runtime = Runtime::new(
302            tokio_rt.handle().clone(),
303            remote.clone(),
304            storage,
305            Some(Duration::from_secs(1)),
306        );
307
308        let remote_vid = VolumeId::random();
309        let graft = runtime
310            .graft_open(None, Some(remote_vid.clone()))
311            .unwrap()
312            .local;
313
314        assert_eq!(runtime.graft_status(&graft).unwrap().to_string(), "_ r_",);
315
316        // sanity check volume writer semantics
317        let mut writer = runtime.graft_writer(graft.clone()).unwrap();
318        for i in [1u8, 2, 5, 9] {
319            let pageidx = PageIdx::must_new(i as u32);
320            let page = Page::test_filled(i);
321            writer.write_page(pageidx, page.clone()).unwrap();
322            assert_eq!(writer.read_page(pageidx).unwrap(), page);
323        }
324        writer.commit().unwrap();
325
326        assert_eq!(runtime.graft_status(&graft).unwrap().to_string(), "1 r_",);
327
328        // sanity check volume reader semantics
329        let reader = runtime.graft_reader(graft.clone()).unwrap();
330        tracing::debug!("got snapshot {:?}", reader.snapshot());
331        for i in [1u8, 2, 5, 9] {
332            let pageidx = PageIdx::must_new(i as u32);
333            let page = Page::test_filled(i);
334            assert_eq!(
335                reader.read_page(pageidx).unwrap().into_bytes(),
336                page.into_bytes()
337            );
338        }
339
340        // create a second runtime connected to the same remote
341        let storage = Arc::new(FjallStorage::open_temporary().unwrap());
342        let runtime_2 = Runtime::new(
343            tokio_rt.handle().clone(),
344            remote.clone(),
345            storage,
346            Some(Duration::from_secs(1)),
347        );
348
349        // open the same remote volume in the second runtime
350        let graft_2 = runtime_2.graft_open(None, Some(remote_vid)).unwrap().local;
351
352        // let both runtimes run for a little while
353        tokio_rt.block_on(async {
354            // this sleep lets tokio advance time, allowing the runtime to flush all its jobs
355            sleep(Duration::from_secs(5)).await;
356            let tree = remote.testonly_format_tree().await;
357            tracing::info!("remote tree\n{tree}")
358        });
359
360        assert_eq!(runtime.graft_status(&graft).unwrap().to_string(), "1 r1",);
361        assert_eq!(
362            runtime_2.graft_status(&graft_2).unwrap().to_string(),
363            "_ r1",
364        );
365
366        // sanity check volume reader semantics in the second runtime
367        let reader_2 = runtime_2.graft_reader(graft_2.clone()).unwrap();
368        let task = tokio_rt.spawn_blocking(move || {
369            for i in [1u8, 2, 5, 9] {
370                let pageidx = PageIdx::must_new(i as u32);
371                tracing::info!("checking page {pageidx}");
372                let expected = Page::test_filled(i);
373                let actual = reader_2.read_page(pageidx).unwrap();
374                assert_eq!(expected, actual, "read unexpected page contents");
375            }
376        });
377        tokio_rt.block_on(task).unwrap();
378
379        // now write to the second volume, and sync back to the first
380        let mut writer_2 = runtime_2.graft_writer(graft_2.clone()).unwrap();
381        for i in [3u8, 4, 5, 7] {
382            let pageidx = PageIdx::must_new(i as u32);
383            let page = Page::test_filled(i + 10);
384            writer_2.write_page(pageidx, page.clone()).unwrap();
385            assert_eq!(writer_2.read_page(pageidx).unwrap(), page);
386        }
387        writer_2.commit().unwrap();
388
389        // let both runtimes run for a little while
390        tokio_rt.block_on(async {
391            // this sleep lets tokio advance time, allowing the runtime to flush all its jobs
392            sleep(Duration::from_secs(5)).await;
393            let tree = remote.testonly_format_tree().await;
394            tracing::info!("remote tree\n{tree}")
395        });
396
397        assert_eq!(runtime.graft_status(&graft).unwrap().to_string(), "1 r2",);
398        assert_eq!(
399            runtime_2.graft_status(&graft_2).unwrap().to_string(),
400            "1 r2",
401        );
402
403        // sanity check volume reader semantics in the first runtime
404        let reader = runtime.graft_reader(graft.clone()).unwrap();
405        let task = tokio_rt.spawn_blocking(move || {
406            for i in [3u8, 4, 5, 7] {
407                let pageidx = PageIdx::must_new(i as u32);
408                tracing::info!("checking page {pageidx}");
409                let expected = Page::test_filled(i + 10);
410                let actual = reader.read_page(pageidx).unwrap();
411                assert_eq!(expected, actual, "read unexpected page contents");
412            }
413        });
414        tokio_rt.block_on(task).unwrap();
415    }
416}