1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use super::*;
impl_veilid_log_facility!("stor");
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OfflineSubkeyWrite {
/// Safety selection to use when writing this record to the network
pub safety_selection: SafetySelection,
/// The subkeys that are queued up needing to be sent to the network in the background
pub subkeys: ValueSubkeyRangeSet,
/// The subkeys currently being sent to the network in the background
#[serde(default)]
pub subkeys_in_flight: ValueSubkeyRangeSet,
}
impl StorageManager {
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "stor", skip_all)
)]
pub(super) fn add_offline_subkey_write(
&self,
opaque_record_key: OpaqueRecordKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
) {
self.inner
.lock()
.offline_subkey_writes
.entry(opaque_record_key)
.and_modify(|x| {
x.subkeys.insert(subkey);
})
.or_insert(OfflineSubkeyWrite {
safety_selection,
subkeys: ValueSubkeyRangeSet::single(subkey),
subkeys_in_flight: ValueSubkeyRangeSet::new(),
});
}
/// If we write to a subkey, we first clear out any existing offline write to that subkey.
/// If the new write succeeds, then this stays cleared
/// If the new write was offline, then it will add it back in
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "stor", skip_all)
)]
pub(super) fn remove_offline_subkey_write_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
) {
// Get the offline subkey write record
match inner.offline_subkey_writes.entry(opaque_record_key.clone()) {
hashlink::linked_hash_map::Entry::Occupied(mut o) => {
let finished = {
let osw = o.get_mut();
// Remove the subkey from the list if it is there
osw.subkeys.remove(subkey);
// If we have no new work to do, and not still doing work, then this record is done
osw.subkeys.is_empty() && osw.subkeys_in_flight.is_empty()
};
if finished {
veilid_log!(self debug "Dropped offline write {}", opaque_record_key);
o.remove();
}
}
hashlink::linked_hash_map::Entry::Vacant(_) => {}
}
}
/// When we finish a offline subkey write, we mark subkeys as no longer in-flight
/// and if we didn't finish all the subkeys they are returned to the list of offline subkeys
/// so we can try again later. If the data associated with the write is no longer necessary
/// we can drop it.
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "stor", skip_all)
)]
pub(super) fn finish_offline_subkey_writes(
&self,
opaque_record_key: &OpaqueRecordKey,
subkeys_written: ValueSubkeyRangeSet,
subkeys_still_offline: ValueSubkeyRangeSet,
) {
assert!(
subkeys_written.is_disjoint(&subkeys_still_offline),
"subkeys can not be written and still offline"
);
// Get the offline subkey write record
match self
.inner
.lock()
.offline_subkey_writes
.entry(opaque_record_key.clone())
{
hashlink::linked_hash_map::Entry::Occupied(mut o) => {
let finished = {
let osw = o.get_mut();
// Now any left over are still offline, so merge them with any subkeys that have been added while we were working
osw.subkeys = osw.subkeys.union(&subkeys_still_offline);
// Remove subkeys that were successfully written from in_flight status
osw.subkeys_in_flight = osw.subkeys_in_flight.difference(&subkeys_written);
// If we have no new work to do, and not still doing work, then this record is done
osw.subkeys.is_empty() && osw.subkeys_in_flight.is_empty()
};
if finished {
veilid_log!(self debug "offline subkey write finished key {}", opaque_record_key);
o.remove();
}
}
hashlink::linked_hash_map::Entry::Vacant(_) => {
veilid_log!(self warn "can't finish missing offline subkey write: ignoring key {}", opaque_record_key);
}
}
}
}