samod_core/loader.rs
1use crate::{
2 PeerId, StorageId, StorageKey, UnixTimestamp,
3 actors::hub::{Hub, State as HubState},
4 ephemera::EphemeralSession,
5 io::{IoResult, IoTask, IoTaskId, StorageResult, StorageTask},
6};
7
8/// A state machine for loading a samod repository.
9///
10/// `SamodLoader` handles the initialization phase of a samod repository,
11/// coordinating between the user and the driver to load or generate the storage ID
12/// and perform any other setup operations required before the repository can be used.
13///
14/// ## Usage
15///
16/// ```rust,no_run
17/// use samod_core::{PeerId, SamodLoader, LoaderState, UnixTimestamp, io::{StorageResult, IoResult}};
18/// use rand::SeedableRng;
19///
20/// let mut rng = rand::rngs::StdRng::from_rng(&mut rand::rng());
21/// let mut loader = SamodLoader::new(PeerId::from("test"));
22///
23/// loop {
24/// match loader.step(&mut rng, UnixTimestamp::now()) {
25/// LoaderState::NeedIo(tasks) => {
26/// // Execute IO tasks and provide results
27/// for task in tasks {
28/// // ... execute task ...
29/// # let result: IoResult<StorageResult> = todo!();
30/// loader.provide_io_result(result);
31/// }
32/// }
33/// LoaderState::Loaded(samod) => {
34/// // Repository is loaded and ready to use
35/// break;
36/// }
37/// }
38/// }
39/// ```
40pub struct SamodLoader {
41 local_peer_id: PeerId,
42 state: State,
43}
44
45/// The current state of the loader.
46pub enum LoaderState {
47 /// The loader needs IO operations to be performed.
48 ///
49 /// The caller should execute all provided IO tasks and call
50 /// `provide_io_result` for each completed task, then call `step` again.
51 NeedIo(Vec<IoTask<StorageTask>>),
52
53 /// Loading is complete and the samod repository is ready to use.
54 Loaded(Box<Hub>),
55}
56
57enum State {
58 Starting,
59 LoadingStorageId(IoTaskId),
60 StorageIdLoaded(Option<Vec<u8>>),
61 PuttingStorageId(IoTaskId, StorageId),
62 Done(StorageId),
63}
64
65impl SamodLoader {
66 /// Creates a new samod loader.
67 ///
68 /// # Arguments
69 ///
70 /// * `now` - The current timestamp for initialization
71 ///
72 /// # Returns
73 ///
74 /// A new `SamodLoader` ready to begin the loading process.
75 pub fn new(local_peer_id: PeerId) -> Self {
76 Self {
77 local_peer_id,
78 state: State::Starting,
79 }
80 }
81
82 /// Advances the loader state machine.
83 ///
84 /// This method should be called repeatedly until `LoaderState::Loaded` is returned.
85 /// When `LoaderState::NeedIo` is returned, the caller must execute the provided
86 /// IO tasks and call `provide_io_result` for each one before calling `step` again.
87 ///
88 /// # Arguments
89 ///
90 /// * `now` - The current timestamp
91 ///
92 /// # Returns
93 ///
94 /// The current state of the loader.
95 pub fn step<R: rand::Rng>(&mut self, rng: &mut R, _now: UnixTimestamp) -> LoaderState {
96 match &self.state {
97 State::Starting => {
98 let task = IoTask::new(StorageTask::Load {
99 key: StorageKey::storage_id_path(),
100 });
101 self.state = State::LoadingStorageId(task.task_id);
102 LoaderState::NeedIo(vec![task])
103 }
104 State::LoadingStorageId(_task_id) => LoaderState::NeedIo(Vec::new()),
105 State::StorageIdLoaded(result) => {
106 if let Some(result) = result {
107 match String::from_utf8(result.to_vec()) {
108 Ok(s) => {
109 let storage_id = StorageId::from(s);
110 self.state = State::Done(storage_id.clone());
111 let state = HubState::new(
112 storage_id,
113 self.local_peer_id.clone(),
114 EphemeralSession::new(rng),
115 );
116 return LoaderState::Loaded(Box::new(Hub::new(state)));
117 }
118 Err(_e) => {
119 tracing::warn!("storage ID was not a valid string, creating a new one");
120 }
121 }
122 } else {
123 tracing::info!("no storage ID found, generating a new one");
124 }
125 let storage_id = StorageId::new(rng);
126 let task = IoTask::new(StorageTask::Put {
127 key: StorageKey::storage_id_path(),
128 value: storage_id.as_str().as_bytes().to_vec(),
129 });
130 self.state = State::PuttingStorageId(task.task_id, storage_id);
131 LoaderState::NeedIo(vec![task])
132 }
133 State::PuttingStorageId(_task_id, _storage_id) => LoaderState::NeedIo(Vec::new()),
134 State::Done(storage_id) => {
135 let state = HubState::new(
136 storage_id.clone(),
137 self.local_peer_id.clone(),
138 EphemeralSession::new(rng),
139 );
140 LoaderState::Loaded(Box::new(Hub::new(state)))
141 }
142 }
143 }
144
145 /// Provides the result of an IO operation.
146 ///
147 /// This method should be called for each IO task that was returned by `step`.
148 /// The loader passes the result directly to the driver for processing.
149 ///
150 /// # Arguments
151 ///
152 /// * `result` - The result of executing an IO task
153 pub fn provide_io_result(&mut self, result: IoResult<StorageResult>) {
154 match self.state {
155 State::Starting | State::Done(_) | State::StorageIdLoaded(_) => {
156 panic!("unexpected IO completion");
157 }
158 State::LoadingStorageId(io_task_id) => {
159 if io_task_id != result.task_id {
160 panic!(
161 "unexpected task ID: expected {:?}, got {:?}",
162 io_task_id, result.task_id
163 );
164 }
165 match result.payload {
166 StorageResult::Load { value } => {
167 self.state = State::StorageIdLoaded(value);
168 }
169 _ => panic!("unexpected storage result when loading storage ID"),
170 }
171 }
172 State::PuttingStorageId(io_task_id, ref storage_id) => {
173 if io_task_id != result.task_id {
174 panic!(
175 "unexpected task ID: expected {:?}, got {:?}",
176 io_task_id, result.task_id
177 );
178 }
179 match result.payload {
180 StorageResult::Put => self.state = State::Done(storage_id.clone()),
181 _ => panic!("unexpected storage result when putting storage ID"),
182 }
183 }
184 }
185 }
186}