1use std::{sync::Arc, time::Duration};
2
3use bytestring::ByteString;
4use culprit::ResultExt;
5use graft_core::{
6 PageCount, PageIdx, SegmentId, VolumeId, checksum::Checksum, commit::SegmentIdx, lsn::LSN,
7 page::Page, pageset::PageSet,
8};
9use tracing::Instrument;
10use tryiter::TryIteratorExt;
11
12use crate::{
13 KernelErr,
14 graft::{Graft, GraftStatus},
15 graft_reader::GraftReader,
16 graft_writer::GraftWriter,
17 remote::Remote,
18 rt::{
19 action::{Action, FetchSegment, FetchVolume, HydrateSnapshot, RemoteCommit},
20 task::{autosync::AutosyncTask, supervise},
21 },
22 snapshot::Snapshot,
23};
24
25use crate::local::fjall_storage::FjallStorage;
26
27type Result<T> = culprit::Result<T, KernelErr>;
28
29#[derive(Clone, Debug)]
30pub struct Runtime {
31 inner: Arc<RuntimeInner>,
32}
33
34#[derive(Debug)]
35struct RuntimeInner {
36 tokio: tokio::runtime::Handle,
37 storage: Arc<FjallStorage>,
38 remote: Arc<Remote>,
39}
40
41impl Runtime {
42 pub fn new(
44 tokio_rt: tokio::runtime::Handle,
45 remote: Arc<Remote>,
46 storage: Arc<FjallStorage>,
47 autosync: Option<Duration>,
48 ) -> Runtime {
49 if let Some(interval) = autosync {
51 let _guard = tokio_rt.enter();
52 let mut ticker = tokio::time::interval(interval);
53 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
54 tokio_rt.spawn(supervise(
55 storage.clone(),
56 remote.clone(),
57 AutosyncTask::new(ticker),
58 ));
59 }
60 Runtime {
61 inner: Arc::new(RuntimeInner { tokio: tokio_rt, storage, remote }),
62 }
63 }
64
65 pub(crate) fn storage(&self) -> &FjallStorage {
66 &self.inner.storage
67 }
68
69 pub(crate) fn create_staged_segment(&self) -> SegmentIdx {
70 SegmentIdx::new(SegmentId::random(), PageSet::default())
72 }
73
74 pub(crate) fn read_page(&self, snapshot: &Snapshot, pageidx: PageIdx) -> Result<Page> {
75 let reader = self.storage().read();
76 if let Some(commit) = reader.search_page(snapshot, pageidx).or_into_ctx()? {
77 let idx = commit
78 .segment_idx()
79 .expect("BUG: commit claims to contain pageidx");
80
81 if let Some(page) = reader.read_page(idx.sid().clone(), pageidx).or_into_ctx()? {
82 return Ok(page);
83 }
84
85 let range = idx
87 .frame_for_pageidx(pageidx)
88 .expect("BUG: no frame for pageidx");
89
90 self.run_action(FetchSegment { range })?;
92
93 Ok(self
96 .storage()
97 .read()
98 .read_page(idx.sid.clone(), pageidx)
99 .or_into_ctx()?
100 .expect("BUG: page not found after fetching"))
101 } else {
102 Ok(Page::EMPTY)
103 }
104 }
105
106 fn run_action<A: Action>(&self, action: A) -> Result<()> {
107 let span = tracing::debug_span!("Action::run", ?action);
108
109 self.inner.tokio.block_on(
110 action
111 .run(&self.inner.storage, &self.inner.remote)
112 .instrument(span),
113 )
114 }
115}
116
117impl Runtime {
119 pub fn tag_iter(&self) -> impl Iterator<Item = Result<(ByteString, VolumeId)>> {
120 self.storage()
121 .read()
122 .iter_tags()
123 .map_err(|err| err.map_ctx(KernelErr::from))
124 }
125
126 pub fn tag_exists(&self, name: &str) -> Result<bool> {
127 self.storage().read().tag_exists(name).or_into_ctx()
128 }
129
130 pub fn tag_get(&self, tag: &str) -> Result<Option<VolumeId>> {
131 self.storage().read().get_tag(tag).or_into_ctx()
132 }
133
134 pub fn tag_replace(&self, tag: &str, graft: VolumeId) -> Result<Option<VolumeId>> {
136 self.storage()
137 .read_write()
138 .tag_replace(tag, graft)
139 .or_into_ctx()
140 }
141
142 pub fn tag_delete(&self, tag: &str) -> Result<()> {
143 self.storage().tag_delete(tag).or_into_ctx()
144 }
145}
146
147impl Runtime {
149 pub fn graft_iter(&self) -> impl Iterator<Item = Result<Graft>> {
150 self.storage()
151 .read()
152 .iter_grafts()
153 .map_err(|err| err.map_ctx(KernelErr::from))
154 }
155
156 pub fn graft_exists(&self, graft: &VolumeId) -> Result<bool> {
157 self.storage().read().graft_exists(graft).or_into_ctx()
158 }
159
160 pub fn graft_open(&self, graft: Option<VolumeId>, remote: Option<VolumeId>) -> Result<Graft> {
164 self.storage()
165 .read_write()
166 .graft_open(graft, remote)
167 .or_into_ctx()
168 }
169
170 pub fn graft_from_snapshot(&self, snapshot: &Snapshot) -> Result<Graft> {
172 self.storage().graft_from_snapshot(snapshot).or_into_ctx()
173 }
174
175 pub fn graft_get(&self, graft: &VolumeId) -> Result<Graft> {
177 self.storage().read().graft(graft).or_into_ctx()
178 }
179
180 pub fn graft_delete(&self, graft: &VolumeId) -> Result<()> {
182 self.storage().graft_delete(graft).or_into_ctx()
183 }
184
185 pub fn graft_pull(&self, graft: VolumeId) -> Result<()> {
188 let graft = self.inner.storage.read().graft(&graft).or_into_ctx()?;
189 self.fetch_volume(graft.remote, None)?;
190 self.storage()
191 .read_write()
192 .sync_remote_to_local(graft.local)
193 .or_into_ctx()
194 }
195
196 pub fn graft_push(&self, graft: VolumeId) -> Result<()> {
197 self.run_action(RemoteCommit { graft })
198 }
199
200 pub fn graft_status(&self, graft: &VolumeId) -> Result<GraftStatus> {
201 let reader = self.storage().read();
202 let state = reader.graft(graft).or_into_ctx()?;
203 let latest_local = reader.latest_lsn(&state.local).or_into_ctx()?;
204 let latest_remote = reader.latest_lsn(&state.remote).or_into_ctx()?;
205 Ok(state.status(latest_local, latest_remote))
206 }
207
208 pub fn graft_snapshot(&self, graft: &VolumeId) -> Result<Snapshot> {
209 self.storage().read().snapshot(graft).or_into_ctx()
210 }
211
212 pub fn graft_reader(&self, graft: VolumeId) -> Result<GraftReader> {
213 let snapshot = self.graft_snapshot(&graft)?;
214 Ok(GraftReader::new(self.clone(), graft, snapshot))
215 }
216
217 pub fn graft_writer(&self, graft: VolumeId) -> Result<GraftWriter> {
218 let snapshot = self.graft_snapshot(&graft)?;
219 let page_count = self.snapshot_pages(&snapshot)?;
220 Ok(GraftWriter::new(self.clone(), graft, snapshot, page_count))
221 }
222}
223
224impl Runtime {
226 pub fn fetch_volume(&self, vid: VolumeId, max_lsn: Option<LSN>) -> Result<()> {
227 self.run_action(FetchVolume { vid, max_lsn })
228 }
229}
230
231impl Runtime {
233 pub fn snapshot_pages(&self, snapshot: &Snapshot) -> Result<PageCount> {
235 if let Some((vid, lsn)) = snapshot.head() {
236 Ok(self
237 .storage()
238 .read()
239 .page_count(vid, lsn)
240 .or_into_ctx()?
241 .expect("BUG: missing head commit for snapshot"))
242 } else {
243 Ok(PageCount::ZERO)
244 }
245 }
246
247 pub fn snapshot_is_latest(&self, graft: &VolumeId, snapshot: &Snapshot) -> Result<bool> {
248 self.storage()
249 .read()
250 .is_latest_snapshot(graft, snapshot)
251 .or_into_ctx()
252 }
253
254 pub fn snapshot_checksum(&self, snapshot: &Snapshot) -> Result<Checksum> {
256 self.storage().read().checksum(snapshot).or_into_ctx()
257 }
258
259 pub fn snapshot_missing_pages(&self, snapshot: &Snapshot) -> Result<PageSet> {
260 let missing_frames = self
261 .storage()
262 .read()
263 .find_missing_frames(snapshot)
264 .or_into_ctx()?;
265 Ok(missing_frames
267 .into_iter()
268 .fold(PageSet::EMPTY, |mut pageset, frame| {
269 pageset |= frame.pageset;
270 pageset
271 }))
272 }
273
274 pub fn snapshot_hydrate(&self, snapshot: Snapshot) -> Result<()> {
275 self.run_action(HydrateSnapshot { snapshot })
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use std::{sync::Arc, time::Duration};
282
283 use graft_core::{PageIdx, VolumeId, page::Page};
284 use tokio::time::sleep;
285
286 use crate::{
287 graft_reader::GraftRead, graft_writer::GraftWrite, local::fjall_storage::FjallStorage,
288 remote::RemoteConfig, rt::runtime::Runtime,
289 };
290
291 #[graft_test::test]
292 fn runtime_sanity() {
293 let tokio_rt = tokio::runtime::Builder::new_current_thread()
294 .start_paused(true)
295 .enable_all()
296 .build()
297 .unwrap();
298
299 let remote = Arc::new(RemoteConfig::Memory.build().unwrap());
300 let storage = Arc::new(FjallStorage::open_temporary().unwrap());
301 let runtime = Runtime::new(
302 tokio_rt.handle().clone(),
303 remote.clone(),
304 storage,
305 Some(Duration::from_secs(1)),
306 );
307
308 let remote_vid = VolumeId::random();
309 let graft = runtime
310 .graft_open(None, Some(remote_vid.clone()))
311 .unwrap()
312 .local;
313
314 assert_eq!(runtime.graft_status(&graft).unwrap().to_string(), "_ r_",);
315
316 let mut writer = runtime.graft_writer(graft.clone()).unwrap();
318 for i in [1u8, 2, 5, 9] {
319 let pageidx = PageIdx::must_new(i as u32);
320 let page = Page::test_filled(i);
321 writer.write_page(pageidx, page.clone()).unwrap();
322 assert_eq!(writer.read_page(pageidx).unwrap(), page);
323 }
324 writer.commit().unwrap();
325
326 assert_eq!(runtime.graft_status(&graft).unwrap().to_string(), "1 r_",);
327
328 let reader = runtime.graft_reader(graft.clone()).unwrap();
330 tracing::debug!("got snapshot {:?}", reader.snapshot());
331 for i in [1u8, 2, 5, 9] {
332 let pageidx = PageIdx::must_new(i as u32);
333 let page = Page::test_filled(i);
334 assert_eq!(
335 reader.read_page(pageidx).unwrap().into_bytes(),
336 page.into_bytes()
337 );
338 }
339
340 let storage = Arc::new(FjallStorage::open_temporary().unwrap());
342 let runtime_2 = Runtime::new(
343 tokio_rt.handle().clone(),
344 remote.clone(),
345 storage,
346 Some(Duration::from_secs(1)),
347 );
348
349 let graft_2 = runtime_2.graft_open(None, Some(remote_vid)).unwrap().local;
351
352 tokio_rt.block_on(async {
354 sleep(Duration::from_secs(5)).await;
356 let tree = remote.testonly_format_tree().await;
357 tracing::info!("remote tree\n{tree}")
358 });
359
360 assert_eq!(runtime.graft_status(&graft).unwrap().to_string(), "1 r1",);
361 assert_eq!(
362 runtime_2.graft_status(&graft_2).unwrap().to_string(),
363 "_ r1",
364 );
365
366 let reader_2 = runtime_2.graft_reader(graft_2.clone()).unwrap();
368 let task = tokio_rt.spawn_blocking(move || {
369 for i in [1u8, 2, 5, 9] {
370 let pageidx = PageIdx::must_new(i as u32);
371 tracing::info!("checking page {pageidx}");
372 let expected = Page::test_filled(i);
373 let actual = reader_2.read_page(pageidx).unwrap();
374 assert_eq!(expected, actual, "read unexpected page contents");
375 }
376 });
377 tokio_rt.block_on(task).unwrap();
378
379 let mut writer_2 = runtime_2.graft_writer(graft_2.clone()).unwrap();
381 for i in [3u8, 4, 5, 7] {
382 let pageidx = PageIdx::must_new(i as u32);
383 let page = Page::test_filled(i + 10);
384 writer_2.write_page(pageidx, page.clone()).unwrap();
385 assert_eq!(writer_2.read_page(pageidx).unwrap(), page);
386 }
387 writer_2.commit().unwrap();
388
389 tokio_rt.block_on(async {
391 sleep(Duration::from_secs(5)).await;
393 let tree = remote.testonly_format_tree().await;
394 tracing::info!("remote tree\n{tree}")
395 });
396
397 assert_eq!(runtime.graft_status(&graft).unwrap().to_string(), "1 r2",);
398 assert_eq!(
399 runtime_2.graft_status(&graft_2).unwrap().to_string(),
400 "1 r2",
401 );
402
403 let reader = runtime.graft_reader(graft.clone()).unwrap();
405 let task = tokio_rt.spawn_blocking(move || {
406 for i in [3u8, 4, 5, 7] {
407 let pageidx = PageIdx::must_new(i as u32);
408 tracing::info!("checking page {pageidx}");
409 let expected = Page::test_filled(i + 10);
410 let actual = reader.read_page(pageidx).unwrap();
411 assert_eq!(expected, actual, "read unexpected page contents");
412 }
413 });
414 tokio_rt.block_on(task).unwrap();
415 }
416}