tp_runtime/offchain/
storage_lock.rs

1// This file is part of Tetcore.
2
3// Copyright (C) 2019-2021 Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # Off-chain Storage Lock
19//!
20//! A storage-based lock with a defined expiry time.
21//!
22//! The lock is using Local Storage and allows synchronizing access to critical
23//! section of your code for concurrently running Off-chain Workers. Usage of
24//! `PERSISTENT` variant of the storage persists the lock value across a full node
25//! restart or re-orgs.
26//!
27//! A use case for the lock is to make sure that a particular section of the
28//! code is only run by one Off-chain Worker at a time. This may include
29//! performing a side-effect (i.e. an HTTP call) or alteration of single or
30//! multiple Local Storage entries.
31//!
32//! One use case would be collective updates of multiple data items or append /
33//! remove of i.e. sets, vectors which are stored in the off-chain storage DB.
34//!
35//! ## Example:
36//!
37//! ```rust
38//! # use codec::{Decode, Encode, Codec};
39//! // in your off-chain worker code
40//! use tp_runtime::offchain::{
41//!		storage::StorageValueRef,
42//!		storage_lock::{StorageLock, Time},
43//! };
44//!
45//! fn append_to_in_storage_vec<'a, T>(key: &'a [u8], _: T) where T: Codec {
46//!    // `access::lock` defines the storage entry which is used for
47//!    // persisting the lock in the underlying database.
48//!    // The entry name _must_ be unique and can be interpreted as a
49//!    // unique mutex instance reference tag.
50//!    let mut lock = StorageLock::<Time>::new(b"access::lock");
51//!    {
52//!         let _guard = lock.lock();
53//!         let acc = StorageValueRef::persistent(key);
54//!         let v: Vec<T> = acc.get::<Vec<T>>().unwrap().unwrap();
55//!         // modify `v` as desired
56//!         // i.e. perform some heavy computation with
57//!         // side effects that should only be done once.
58//!         acc.set(&v);
59//!         // drop `_guard` implicitly at end of scope
60//!    }
61//! }
62//! ```
63
64use crate::offchain::storage::StorageValueRef;
65use crate::traits::AtLeast32BitUnsigned;
66use codec::{Codec, Decode, Encode};
67use tet_core::offchain::{Duration, Timestamp};
68use tet_io::offchain;
69
70/// Default expiry duration for time based locks in milliseconds.
71const STORAGE_LOCK_DEFAULT_EXPIRY_DURATION: Duration = Duration::from_millis(20_000);
72
73/// Default expiry duration for block based locks in blocks.
74const STORAGE_LOCK_DEFAULT_EXPIRY_BLOCKS: u32 = 4;
75
76/// Time between checks if the lock is still being held in milliseconds.
77const STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MIN: Duration = Duration::from_millis(100);
78const STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MAX: Duration = Duration::from_millis(10);
79
80/// Lockable item for use with a persisted storage lock.
81///
82/// Bound for an item that has a stateful ordered meaning
83/// without explicitly requiring `Ord` trait in general.
84pub trait Lockable: Sized {
85	/// An instant type specifying i.e. a point in time.
86	type Deadline: Sized + Codec + Clone;
87
88	/// Calculate the deadline based on a current state
89	/// such as time or block number and derives the deadline.
90	fn deadline(&self) -> Self::Deadline;
91
92	/// Verify the deadline has not expired compared to the
93	/// current state, i.e. time or block number.
94	fn has_expired(deadline: &Self::Deadline) -> bool;
95
96	/// Snooze at least until `deadline` is reached.
97	///
98	/// Note that `deadline` is only passed to allow optimizations
99	/// for `Lockables` which have a time based component.
100	fn snooze(_deadline: &Self::Deadline) {
101		tet_io::offchain::sleep_until(
102			offchain::timestamp().add(STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MAX),
103		);
104	}
105}
106
107/// Lockable based on the current timestamp with a configurable expiration time.
108#[derive(Encode, Decode)]
109pub struct Time {
110	/// How long the lock will stay valid once `fn lock(..)` or
111	/// `fn try_lock(..)` successfully acquire a lock.
112	expiration_duration: Duration,
113}
114
115impl Default for Time {
116	fn default() -> Self {
117		Self {
118			expiration_duration: STORAGE_LOCK_DEFAULT_EXPIRY_DURATION,
119		}
120	}
121}
122
123impl Lockable for Time {
124	type Deadline = Timestamp;
125
126	fn deadline(&self) -> Self::Deadline {
127		offchain::timestamp().add(self.expiration_duration)
128	}
129
130	fn has_expired(deadline: &Self::Deadline) -> bool {
131		offchain::timestamp() > *deadline
132	}
133
134	fn snooze(deadline: &Self::Deadline) {
135		let now = offchain::timestamp();
136		let remainder: Duration = now.diff(&deadline);
137		// do not snooze the full duration, but instead snooze max 100ms
138		// it might get unlocked in another thread
139		use core::cmp::{max, min};
140		let snooze = max(
141			min(remainder, STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MAX),
142			STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MIN,
143		);
144		tet_io::offchain::sleep_until(now.add(snooze));
145	}
146}
147
148/// A deadline based on block number and time.
149#[derive(Encode, Decode, Eq, PartialEq)]
150pub struct BlockAndTimeDeadline<B: BlockNumberProvider> {
151	/// The block number until which the lock is still valid _at least_.
152	pub block_number: <B as BlockNumberProvider>::BlockNumber,
153	/// The timestamp until which the lock is still valid _at least_.
154	pub timestamp: Timestamp,
155}
156
157impl<B: BlockNumberProvider> Clone for BlockAndTimeDeadline<B> {
158	fn clone(&self) -> Self {
159		Self {
160			block_number: self.block_number.clone(),
161			timestamp: self.timestamp.clone(),
162		}
163	}
164}
165
166impl<B: BlockNumberProvider> Default for BlockAndTimeDeadline<B> {
167	/// Provide the current state of block number and time.
168	fn default() -> Self {
169		Self {
170			block_number: B::current_block_number() + STORAGE_LOCK_DEFAULT_EXPIRY_BLOCKS.into(),
171			timestamp: offchain::timestamp().add(STORAGE_LOCK_DEFAULT_EXPIRY_DURATION),
172		}
173	}
174}
175
176/// Lockable based on block number and timestamp.
177///
178/// Expiration is defined if both, block number _and_ timestamp
179/// expire.
180pub struct BlockAndTime<B: BlockNumberProvider> {
181	/// Relative block number offset, which is used to determine
182	/// the block number part of the deadline.
183	expiration_block_number_offset: u32,
184	/// Relative duration, used to derive the time based part of
185	/// the deadline.
186	expiration_duration: Duration,
187
188	_phantom: core::marker::PhantomData<B>,
189}
190
191impl<B: BlockNumberProvider> Default for BlockAndTime<B> {
192	fn default() -> Self {
193		Self {
194			expiration_block_number_offset: STORAGE_LOCK_DEFAULT_EXPIRY_BLOCKS,
195			expiration_duration: STORAGE_LOCK_DEFAULT_EXPIRY_DURATION,
196			_phantom: core::marker::PhantomData::<B>,
197		}
198	}
199}
200
201// derive not possible, since `B` does not necessarily implement `trait Clone`
202impl<B: BlockNumberProvider> Clone for BlockAndTime<B> {
203	fn clone(&self) -> Self {
204		Self {
205			expiration_block_number_offset: self.expiration_block_number_offset.clone(),
206			expiration_duration: self.expiration_duration,
207			_phantom: core::marker::PhantomData::<B>,
208		}
209	}
210}
211
212impl<B: BlockNumberProvider> Lockable for BlockAndTime<B> {
213	type Deadline = BlockAndTimeDeadline<B>;
214
215	fn deadline(&self) -> Self::Deadline {
216		let block_number = <B as BlockNumberProvider>::current_block_number()
217			+ self.expiration_block_number_offset.into();
218		BlockAndTimeDeadline {
219			timestamp: offchain::timestamp().add(self.expiration_duration),
220			block_number,
221		}
222	}
223
224	fn has_expired(deadline: &Self::Deadline) -> bool {
225		offchain::timestamp() > deadline.timestamp
226			&& <B as BlockNumberProvider>::current_block_number() > deadline.block_number
227	}
228
229	fn snooze(deadline: &Self::Deadline) {
230		let now = offchain::timestamp();
231		let remainder: Duration = now.diff(&(deadline.timestamp));
232		use core::cmp::{max, min};
233		let snooze = max(
234			min(remainder, STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MAX),
235			STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MIN,
236		);
237		tet_io::offchain::sleep_until(now.add(snooze));
238	}
239}
240
241/// Storage based lock.
242///
243/// A lock that is persisted in the DB and provides the ability to guard against
244/// concurrent access in an off-chain worker, with a defined expiry deadline
245/// based on the concrete [`Lockable`](Lockable) implementation.
246pub struct StorageLock<'a, L = Time> {
247	// A storage value ref which defines the DB entry representing the lock.
248	value_ref: StorageValueRef<'a>,
249	lockable: L,
250}
251
252impl<'a, L: Lockable + Default> StorageLock<'a, L> {
253	/// Create a new storage lock with a `default()` instance of type `L`.
254	pub fn new(key: &'a [u8]) -> Self {
255		Self::with_lockable(key, Default::default())
256	}
257}
258
259impl<'a, L: Lockable> StorageLock<'a, L> {
260	/// Create a new storage lock with an explicit instance of a lockable `L`.
261	pub fn with_lockable(key: &'a [u8], lockable: L) -> Self {
262		Self {
263			value_ref: StorageValueRef::<'a>::persistent(key),
264			lockable,
265		}
266	}
267
268	/// Extend active lock's deadline
269	fn extend_active_lock(&mut self) -> Result<<L as Lockable>::Deadline, ()> {
270		let res = self.value_ref.mutate(|s: Option<Option<L::Deadline>>| -> Result<<L as Lockable>::Deadline, ()> {
271			match s {
272				// lock is present and is still active, extend the lock.
273				Some(Some(deadline)) if !<L as Lockable>::has_expired(&deadline) =>
274					Ok(self.lockable.deadline()),
275				// other cases
276				_ => Err(()),
277			}
278		});
279		match res {
280			Ok(Ok(deadline)) => Ok(deadline),
281			Ok(Err(_)) => Err(()),
282			Err(e) => Err(e),
283		}
284	}
285
286	/// Internal lock helper to avoid lifetime conflicts.
287	fn try_lock_inner(
288		&mut self,
289		new_deadline: L::Deadline,
290	) -> Result<(), <L as Lockable>::Deadline> {
291		let res = self.value_ref.mutate(
292			|s: Option<Option<L::Deadline>>|
293			-> Result<<L as Lockable>::Deadline, <L as Lockable>::Deadline> {
294				match s {
295					// no lock set, we can safely acquire it
296					None => Ok(new_deadline),
297					// write was good, but read failed
298					Some(None) => Ok(new_deadline),
299					// lock is set, but it is expired. We can re-acquire it.
300					Some(Some(deadline)) if <L as Lockable>::has_expired(&deadline) =>
301						Ok(new_deadline),
302					// lock is present and is still active
303					Some(Some(deadline)) => Err(deadline),
304				}
305			},
306		);
307		match res {
308			Ok(Ok(_)) => Ok(()),
309			Ok(Err(deadline)) => Err(deadline),
310			Err(e) => Err(e),
311		}
312	}
313
314	/// A single attempt to lock using the storage entry.
315	///
316	/// Returns a lock guard on success, otherwise an error containing the
317	/// `<Self::Lockable>::Deadline` in for the currently active lock
318	/// by another task `Err(<L as Lockable>::Deadline)`.
319	pub fn try_lock(&mut self) -> Result<StorageLockGuard<'a, '_, L>, <L as Lockable>::Deadline> {
320		self.try_lock_inner(self.lockable.deadline())?;
321		Ok(StorageLockGuard::<'a, '_> { lock: Some(self) })
322	}
323
324	/// Repeated lock attempts until the lock is successfully acquired.
325	///
326	/// If one uses `fn forget(..)`, it is highly likely `fn try_lock(..)`
327	/// is the correct API to use instead of `fn lock(..)`, since that might
328	/// never unlock in the anticipated span i.e. when used with `BlockAndTime`
329	/// during a certain block number span.
330	pub fn lock(&mut self) -> StorageLockGuard<'a, '_, L> {
331		while let Err(deadline) = self.try_lock_inner(self.lockable.deadline()) {
332			L::snooze(&deadline);
333		}
334		StorageLockGuard::<'a, '_, L> { lock: Some(self) }
335	}
336
337	/// Explicitly unlock the lock.
338	fn unlock(&mut self) {
339		self.value_ref.clear();
340	}
341}
342
343/// RAII style guard for a lock.
344pub struct StorageLockGuard<'a, 'b, L: Lockable> {
345	lock: Option<&'b mut StorageLock<'a, L>>,
346}
347
348impl<'a, 'b, L: Lockable> StorageLockGuard<'a, 'b, L> {
349	/// Consume the guard but **do not** unlock the underlying lock.
350	///
351	/// Can be used to implement a grace period after doing some
352	/// heavy computations and sending a transaction to be included
353	/// on-chain. By forgetting the lock, it will stay locked until
354	/// its expiration deadline is reached while the off-chain worker
355	/// can already exit.
356	pub fn forget(mut self) {
357		let _ = self.lock.take();
358	}
359
360	/// Extend the lock by guard deadline if it already exists.
361	///
362	/// i.e. large sets of items for which it is hard to calculate a
363	/// meaning full conservative deadline which does not block for a
364	/// very long time on node termination.
365	pub fn extend_lock(&mut self) -> Result<<L as Lockable>::Deadline, ()> {
366		if let Some(ref mut lock) = self.lock {
367			lock.extend_active_lock()
368		} else {
369			Err(())
370		}
371	}
372}
373
374impl<'a, 'b, L: Lockable> Drop for StorageLockGuard<'a, 'b, L> {
375	fn drop(&mut self) {
376		if let Some(lock) = self.lock.take() {
377			lock.unlock();
378		}
379	}
380}
381
382impl<'a> StorageLock<'a, Time> {
383	/// Explicitly create a time based storage lock with a non-default
384	/// expiration timeout.
385	pub fn with_deadline(key: &'a [u8], expiration_duration: Duration) -> Self {
386		Self {
387			value_ref: StorageValueRef::<'a>::persistent(key),
388			lockable: Time {
389				expiration_duration: expiration_duration,
390			},
391		}
392	}
393}
394
395impl<'a, B> StorageLock<'a, BlockAndTime<B>>
396where
397	B: BlockNumberProvider,
398{
399	/// Explicitly create a time and block number based storage lock with
400	/// a non-default expiration duration and block number offset.
401	pub fn with_block_and_time_deadline(
402		key: &'a [u8],
403		expiration_block_number_offset: u32,
404		expiration_duration: Duration,
405	) -> Self {
406		Self {
407			value_ref: StorageValueRef::<'a>::persistent(key),
408			lockable: BlockAndTime::<B> {
409				expiration_block_number_offset,
410				expiration_duration,
411				_phantom: core::marker::PhantomData,
412			},
413		}
414	}
415
416	/// Explicitly create a time and block number based storage lock with
417	/// the default expiration duration and a non-default block number offset.
418	pub fn with_block_deadline(key: &'a [u8], expiration_block_number_offset: u32) -> Self {
419		Self {
420			value_ref: StorageValueRef::<'a>::persistent(key),
421			lockable: BlockAndTime::<B> {
422				expiration_block_number_offset,
423				expiration_duration: STORAGE_LOCK_DEFAULT_EXPIRY_DURATION,
424				_phantom: core::marker::PhantomData,
425			},
426		}
427	}
428}
429
430/// Bound for a block number source
431/// used with [`BlockAndTime<BlockNumberProvider>`](BlockAndTime).
432pub trait BlockNumberProvider {
433	/// Type of `BlockNumber` to provide.
434	type BlockNumber: Codec + Clone + Ord + Eq + AtLeast32BitUnsigned;
435	/// Returns the current block number.
436	///
437	/// Provides an abstraction over an arbitrary way of providing the
438	/// current block number.
439	///
440	/// In case of using crate `tp_runtime` without the crate `fabric`
441	/// system, it is already implemented for
442	/// `fabric_system::Module<T: Config>` as:
443	///
444	/// ```ignore
445	/// fn current_block_number() -> Self {
446	///     fabric_system::Module<Config>::block_number()
447	/// }
448	/// ```
449	/// .
450	fn current_block_number() -> Self::BlockNumber;
451}
452
453#[cfg(test)]
454mod tests {
455	use super::*;
456	use tet_core::offchain::{testing, OffchainExt};
457	use tet_io::TestExternalities;
458
459	const VAL_1: u32 = 0u32;
460	const VAL_2: u32 = 0xFFFF_FFFFu32;
461
462	#[test]
463	fn storage_lock_write_unlock_lock_read_unlock() {
464		let (offchain, state) = testing::TestOffchainExt::new();
465		let mut t = TestExternalities::default();
466		t.register_extension(OffchainExt::new(offchain));
467
468		t.execute_with(|| {
469			let mut lock = StorageLock::<'_, Time>::new(b"lock_1");
470
471			let val = StorageValueRef::persistent(b"protected_value");
472
473			{
474				let _guard = lock.lock();
475
476				val.set(&VAL_1);
477
478				assert_eq!(val.get::<u32>(), Some(Some(VAL_1)));
479			}
480
481			{
482				let _guard = lock.lock();
483				val.set(&VAL_2);
484
485				assert_eq!(val.get::<u32>(), Some(Some(VAL_2)));
486			}
487		});
488		// lock must have been cleared at this point
489		assert_eq!(state.read().persistent_storage.get(b"lock_1"), None);
490	}
491
492	#[test]
493	fn storage_lock_and_forget() {
494		let (offchain, state) = testing::TestOffchainExt::new();
495		let mut t = TestExternalities::default();
496		t.register_extension(OffchainExt::new(offchain));
497
498		t.execute_with(|| {
499			let mut lock = StorageLock::<'_, Time>::new(b"lock_2");
500
501			let val = StorageValueRef::persistent(b"protected_value");
502
503			let guard = lock.lock();
504
505			val.set(&VAL_1);
506
507			assert_eq!(val.get::<u32>(), Some(Some(VAL_1)));
508
509			guard.forget();
510		});
511		// lock must have been cleared at this point
512		let opt = state.read().persistent_storage.get(b"lock_2");
513		assert!(opt.is_some());
514	}
515
516	#[test]
517	fn storage_lock_and_let_expire_and_lock_again() {
518		let (offchain, state) = testing::TestOffchainExt::new();
519		let mut t = TestExternalities::default();
520		t.register_extension(OffchainExt::new(offchain));
521
522		t.execute_with(|| {
523			let sleep_until = offchain::timestamp().add(Duration::from_millis(500));
524			let lock_expiration = Duration::from_millis(200);
525
526			let mut lock = StorageLock::<'_, Time>::with_deadline(b"lock_3", lock_expiration);
527
528			{
529				let guard = lock.lock();
530				guard.forget();
531			}
532
533			// assure the lock expires
534			offchain::sleep_until(sleep_until);
535
536			let mut lock = StorageLock::<'_, Time>::new(b"lock_3");
537			let res = lock.try_lock();
538			assert!(res.is_ok());
539			let guard = res.unwrap();
540			guard.forget();
541		});
542
543		// lock must have been cleared at this point
544		let opt = state.read().persistent_storage.get(b"lock_3");
545		assert!(opt.is_some());
546	}
547
548	#[test]
549	fn extend_active_lock() {
550		let (offchain, state) = testing::TestOffchainExt::new();
551		let mut t = TestExternalities::default();
552		t.register_extension(OffchainExt::new(offchain));
553
554		t.execute_with(|| {
555			let lock_expiration = Duration::from_millis(300);
556
557			let mut lock = StorageLock::<'_, Time>::with_deadline(b"lock_4", lock_expiration);
558			let mut guard = lock.lock();
559
560			// sleep_until < lock_expiration
561			offchain::sleep_until(offchain::timestamp().add(Duration::from_millis(200)));
562
563			// the lock is still active, extend it successfully
564			assert_eq!(guard.extend_lock().is_ok(), true);
565
566			// sleep_until < deadline
567			offchain::sleep_until(offchain::timestamp().add(Duration::from_millis(200)));
568
569			// the lock is still active, try_lock will fail
570			let mut lock = StorageLock::<'_, Time>::with_deadline(b"lock_4", lock_expiration);
571			let res = lock.try_lock();
572			assert_eq!(res.is_ok(), false);
573
574			// sleep again untill sleep_until > deadline
575			offchain::sleep_until(offchain::timestamp().add(Duration::from_millis(200)));
576
577			// the lock has expired, failed to extend it
578			assert_eq!(guard.extend_lock().is_ok(), false);
579			guard.forget();
580
581			// try_lock will succeed
582			let mut lock = StorageLock::<'_, Time>::with_deadline(b"lock_4", lock_expiration);
583			let res = lock.try_lock();
584			assert!(res.is_ok());
585			let guard = res.unwrap();
586
587			guard.forget();
588		});
589
590		// lock must have been cleared at this point
591		let opt = state.read().persistent_storage.get(b"lock_4");
592		assert_eq!(opt.unwrap(), vec![132_u8, 3u8, 0, 0, 0, 0, 0, 0]); // 132 + 256 * 3 = 900
593	}
594}