1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
use super::*;
impl StorageManager {
/// Open an existing local record if it exists, and if it doesnt exist locally, try to pull it from the network and open it and return the opened descriptor
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "stor", skip_all)
)]
pub async fn open_record(
&self,
record_key: RecordKey,
writer: Option<KeyPair>,
safety_selection: SafetySelection,
) -> VeilidAPIResult<DHTRecordDescriptor> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
let opaque_record_key = record_key.opaque();
let record_lock = self
.record_lock_table
.lock_record(
opaque_record_key.clone(),
StorageManagerRecordLockPurpose::Open,
)
.await;
// See if we have a local record already or not
if let Some(res) = self.open_existing_record_locked(
&record_lock,
record_key.clone(),
writer.clone(),
safety_selection.clone(),
)? {
// We had an existing record, so check the network to see if we should
// update it with what we have here
let set_consensus = self.config().network.dht.set_value_count as usize;
self.add_rehydration_request(
opaque_record_key,
ValueSubkeyRangeSet::full(),
set_consensus,
);
return Ok(res);
}
// No record yet, try to get it from the network
if !self.dht_is_online() {
apibail_try_again!("offline, try again later");
};
// Inspecting only subkey 0 gets the descriptor for the record but
// minimizes the number of subkeys we wait for from the network in the event that
// the record has many subkeys that have not yet been written to
// This is a bit of a hack because in theory other subkeys besides 0 could have been
// written to, but subkey 0 is the most likely to have been written to first
// Also, we know subkey 0 must exist, and if we don't have a schema, the only other alternative
// is a ValueSubkeyRangeSet::full() which would be more like a transact_dht_record in terms of wait time
// No last descriptor, no last value. Use the safety selection we opened the record with.
let outbound_inspect_result = self
.outbound_inspect_value(
&opaque_record_key,
ValueSubkeyRangeSet::single(0),
safety_selection.clone(),
InspectResult::default(),
false,
)
.await?;
// If we got nothing back, the key wasn't found
if outbound_inspect_result
.inspect_result
.opt_descriptor()
.is_none()
{
// No result
apibail_key_not_found!(opaque_record_key);
};
// Open the new record
self.open_new_record_locked(
&record_lock,
record_key,
writer,
outbound_inspect_result,
safety_selection,
)
.await
}
////////////////////////////////////////////////////////////////////////
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "stor", skip_all, err)
)]
pub(super) fn open_existing_record_locked(
&self,
record_lock: &StorageManagerRecordLockGuard,
record_key: RecordKey,
writer: Option<KeyPair>,
safety_selection: SafetySelection,
) -> VeilidAPIResult<Option<DHTRecordDescriptor>> {
let opaque_record_key = record_lock.record();
if record_key.opaque() != opaque_record_key {
apibail_internal!("wrong record lock");
}
// Get local record store
let local_record_store = self.get_local_record_store()?;
// See if we have a local record already or not
let cb = |descriptor: Arc<SignedValueDescriptor>, r: &mut LocalRecordDetail| {
// Process local record
// Keep the safety selection we opened the record with
r.safety_selection = safety_selection.clone();
// Return record details
(descriptor.owner(), descriptor.schema().unwrap_or_log())
};
let (owner, schema) =
match local_record_store.with_record_detail_mut(&opaque_record_key, cb)? {
Some(v) => v,
None => {
return Ok(None);
}
};
// Had local record
// If the writer we chose is also the owner, we have the owner secret
// Otherwise this is just another subkey writer
let owner_secret = if let Some(writer) = writer.clone() {
if writer.key() == owner {
Some(writer.secret())
} else {
None
}
} else {
None
};
let crypto = self.crypto();
let mut crypto_with_key: Option<(CryptoSystemGuard, BareSharedSecret)> = None;
if let Some(k) = record_key.ref_value().encryption_key() {
let Some(value_crypto) = crypto.get(record_key.kind()) else {
apibail_generic!("unsupported cryptosystem for record encryption key");
};
crypto_with_key = Some((value_crypto, k));
}
// Write open record
self.inner
.lock()
.opened_records
.entry(opaque_record_key)
.and_modify(|e| {
e.set_writer(writer.clone());
e.set_safety_selection(safety_selection.clone());
e.set_encryption_key(crypto_with_key.as_ref().map(|(_, k)| k.clone()));
})
.or_insert_with(|| {
OpenedRecord::new(
writer.clone(),
safety_selection.clone(),
crypto_with_key.map(|(_, k)| k),
)
});
// Make DHT Record Descriptor to return
let descriptor = DHTRecordDescriptor::new(record_key, owner, owner_secret, schema);
Ok(Some(descriptor))
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "stor", skip_all, err)
)]
pub(super) async fn open_new_record_locked(
&self,
record_lock: &StorageManagerRecordLockGuard,
record_key: RecordKey,
writer: Option<KeyPair>,
outbound_inspect_result: OutboundInspectValueResult,
safety_selection: SafetySelection,
) -> VeilidAPIResult<DHTRecordDescriptor> {
let opaque_record_key = record_lock.record();
if record_key.opaque() != opaque_record_key {
apibail_internal!("wrong record lock");
}
let local_record_store = self.get_local_record_store()?;
// Ensure the record is closed
if self
.inner
.lock()
.opened_records
.contains_key(&opaque_record_key)
{
apibail_internal!(
"new record {} should never be opened at this point",
opaque_record_key
);
}
// Must have descriptor
let Some(signed_value_descriptor) = outbound_inspect_result.inspect_result.opt_descriptor()
else {
// No descriptor for new record, can't store this
apibail_generic!("no descriptor");
};
// Get owner
let owner = signed_value_descriptor.owner();
// If the writer we chose is also the owner, we have the owner secret
// Otherwise this is just another subkey writer
let owner_secret = if let Some(writer) = &writer {
if writer.key() == owner {
Some(writer.secret())
} else {
None
}
} else {
None
};
let schema = signed_value_descriptor.schema()?;
// Make and store a new record for this descriptor
let record = Record::<LocalRecordDetail>::new(
Timestamp::now(),
signed_value_descriptor,
LocalRecordDetail::new(safety_selection.clone()),
)?;
local_record_store
.new_record(opaque_record_key.clone(), record)
.await?;
let encryption_key = record_key.ref_value().encryption_key();
// Write open record
self.inner.lock().opened_records.insert(
opaque_record_key.clone(),
OpenedRecord::new(writer, safety_selection, encryption_key),
);
// Keep the list of nodes that returned a value for later reference
let results_iter = outbound_inspect_result
.inspect_result
.subkeys()
.iter()
.map(ValueSubkeyRangeSet::single)
.zip(outbound_inspect_result.subkey_fanout_results.into_iter());
let existed = self.process_fanout_results(
opaque_record_key.clone(),
results_iter,
false,
self.config().network.dht.consensus_width as usize,
)?;
if !existed {
apibail_internal!(
"record was locked for open new record but is now missing: {}",
opaque_record_key
);
}
// Make DHT Record Descriptor to return
let descriptor = DHTRecordDescriptor::new(record_key, owner, owner_secret, schema);
Ok(descriptor)
}
}