Skip to main content

veilid_core/veilid_api/
routing_context.rs

1use super::*;
2
3impl_veilid_log_facility!("veilid_api");
4
5///////////////////////////////////////////////////////////////////////////////////////
6
7/// Valid destinations for a message sent over a routing context.
8#[derive(
9    Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema,
10)]
11#[cfg_attr(
12    all(target_arch = "wasm32", target_os = "unknown"),
13    derive(Tsify),
14    tsify(from_wasm_abi, into_wasm_abi, namespace)
15)]
16#[must_use]
17pub enum Target {
18    /// Node by its node id
19    #[schemars(with = "String")]
20    NodeId(NodeId),
21    /// Remote private route by its id.
22    #[schemars(with = "String")]
23    RouteId(RouteId),
24}
25
26pub(crate) struct RoutingContextUnlockedInner {
27    /// Safety routing requirements.
28    safety_selection: SafetySelection,
29}
30
31/// Routing contexts are the way you specify the communication preferences for Veilid.
32///
33/// By default routing contexts have 'safety routing' enabled which offers sender privacy.
34/// privacy. To disable this and send RPC operations straight from the node use [RoutingContext::with_safety()] with a [SafetySelection::Unsafe] parameter.
35/// To enable receiver privacy, you should send to a private route RouteId that you have imported, rather than directly to a NodeId.
36#[derive(Clone)]
37#[must_use]
38pub struct RoutingContext {
39    /// Veilid API handle.
40    api: VeilidAPI,
41    unlocked_inner: Arc<RoutingContextUnlockedInner>,
42}
43
44impl fmt::Debug for RoutingContext {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        f.debug_struct("RoutingContext")
47            .field("ptr", &format!("{:p}", Arc::as_ptr(&self.unlocked_inner)))
48            .field("safety_selection", &self.unlocked_inner.safety_selection)
49            .finish()
50    }
51}
52
53impl RoutingContext {
54    ////////////////////////////////////////////////////////////////
55
56    pub(super) fn try_new(api: VeilidAPI) -> VeilidAPIResult<Self> {
57        let config = api.config()?;
58
59        Ok(Self {
60            api,
61            unlocked_inner: Arc::new(RoutingContextUnlockedInner {
62                safety_selection: SafetySelection::Safe(SafetySpec {
63                    preferred_route: None,
64                    hop_count: config.network.rpc.default_route_hop_count as usize,
65                    stability: Stability::Reliable,
66                    sequencing: Sequencing::PreferOrdered,
67                }),
68            }),
69        })
70    }
71
72    #[must_use]
73    pub(crate) fn log_key(&self) -> &str {
74        self.api.log_key()
75    }
76
77    /// Turn on sender privacy, enabling the use of safety routes. This is the default and
78    /// calling this function is only necessary if you have previously disable safety or used other parameters.
79    ///
80    /// Default values for hop count, stability and sequencing preferences are used.
81    ///
82    /// * Hop count default is dependent on config, but is set to 1 extra hop.
83    /// * Stability default is to choose reliable routes, preferring them over low latency.
84    /// * Sequencing default is to prefer ordered before unordered message delivery.
85    ///
86    /// To customize the safety selection in use, use [RoutingContext::with_safety()].
87    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
88    pub fn with_default_safety(self) -> VeilidAPIResult<Self> {
89        let this = self.clone();
90        record_duration(|| {
91            veilid_log!(self debug
92            "RoutingContext::with_default_safety(self: {:?})", self);
93
94            let config = self.api.config()?;
95
96            self.with_safety(SafetySelection::Safe(SafetySpec {
97                preferred_route: None,
98                hop_count: config.network.rpc.default_route_hop_count as usize,
99                stability: Stability::Reliable,
100                sequencing: Sequencing::PreferOrdered,
101            }))
102        })
103        .inspect_err(log_veilid_api_error!(this))
104    }
105
106    /// Use a custom [SafetySelection]. Can be used to disable safety via [SafetySelection::Unsafe].
107    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
108    pub fn with_safety(self, safety_selection: SafetySelection) -> VeilidAPIResult<Self> {
109        let this = self.clone();
110        record_duration(|| {
111            veilid_log!(self debug
112            "RoutingContext::with_safety(self: {:?}, safety_selection: {:?})", self, safety_selection);
113
114            if let SafetySelection::Unsafe(_) = &safety_selection {
115                #[cfg(not(feature = "footgun"))]
116                {
117                    return Err(VeilidAPIError::generic(
118                        "Unsafe routing mode is not allowed without the 'footgun' feature enabled",
119                    ));
120                }
121            }
122
123            if let SafetySelection::Safe(safe) = &safety_selection {
124                if let Some(preferred_route) = &safe.preferred_route {
125                    self.api
126                        .core_context()?
127                        .routing_table()
128                        .check_route_id(preferred_route)?;
129                }
130            }
131
132            Ok(Self {
133                api: self.api.clone(),
134                unlocked_inner: Arc::new(RoutingContextUnlockedInner { safety_selection }),
135            })
136        }).inspect_err(log_veilid_api_error!(this))
137    }
138
139    /// Use a specified [Sequencing] preference, with or without privacy.
140    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
141    pub fn with_sequencing(self, sequencing: Sequencing) -> Self {
142        record_duration(|| {
143            veilid_log!(self debug
144            "RoutingContext::with_sequencing(self: {:?}, sequencing: {:?})", self, sequencing);
145
146            Self {
147                api: self.api.clone(),
148                unlocked_inner: Arc::new(RoutingContextUnlockedInner {
149                    safety_selection: match &self.unlocked_inner.safety_selection {
150                        SafetySelection::Unsafe(_) => SafetySelection::Unsafe(sequencing),
151                        SafetySelection::Safe(safety_spec) => SafetySelection::Safe(SafetySpec {
152                            preferred_route: safety_spec.preferred_route.clone(),
153                            hop_count: safety_spec.hop_count,
154                            stability: safety_spec.stability,
155                            sequencing,
156                        }),
157                    },
158                }),
159            }
160        })
161    }
162
163    /// Get the safety selection in use on this routing context.
164    pub fn safety(&self) -> SafetySelection {
165        self.unlocked_inner.safety_selection.clone()
166    }
167
168    /// Get the sequencing used by this routing context
169    pub fn sequencing(&self) -> Sequencing {
170        match &self.unlocked_inner.safety_selection {
171            SafetySelection::Unsafe(sequencing) => *sequencing,
172            SafetySelection::Safe(safety_spec) => safety_spec.sequencing,
173        }
174    }
175
176    /// Get the [VeilidAPI] object that created this [RoutingContext].
177    pub fn api(&self) -> VeilidAPI {
178        self.api.clone()
179    }
180
181    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
182    async fn get_destination(&self, target: Target) -> VeilidAPIResult<rpc_processor::Destination> {
183        record_duration_fut(async {
184            veilid_log!(self debug
185                "RoutingContext::get_destination(self: {:?}, target: {:?})", self, target);
186
187            let rpc_processor = self.api.core_context()?.rpc_processor();
188            Box::pin(rpc_processor.resolve_target_to_destination(
189                target,
190                self.unlocked_inner.safety_selection.clone(),
191            ))
192            .await
193            .map_err(VeilidAPIError::invalid_target)
194        })
195        .await
196        .inspect_err(log_veilid_api_error!(self))
197    }
198
199    fn check_target(&self, target: &Target) -> VeilidAPIResult<()> {
200        match target {
201            Target::NodeId(node_id) => {
202                self.api
203                    .core_context()?
204                    .routing_table()
205                    .check_node_id(node_id)?;
206            }
207            Target::RouteId(route_id) => {
208                self.api
209                    .core_context()?
210                    .routing_table()
211                    .check_route_id(route_id)?;
212            }
213        }
214        Ok(())
215    }
216
217    ////////////////////////////////////////////////////////////////
218    // App-level Messaging
219
220    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", skip(message), fields(duration, __VEILID_LOG_KEY = self.log_key(), message_len = message.len(), ret.len)))]
221    async fn internal_app_call(
222        &self,
223        target: Target,
224        message: Vec<u8>,
225    ) -> VeilidAPIResult<Vec<u8>> {
226        record_duration_fut(async {
227            veilid_log!(self debug
228                "RoutingContext::app_call(self: {:?}, target: {:?}, message_len: {:?})", self, target, message.len());
229            veilid_log!(self trace "message: {:?}", message);
230
231            self.check_target(&target)?;
232
233            let rpc_processor = self.api.core_context()?.rpc_processor();
234
235            // Get destination
236            let dest = self.get_destination(target).await?;
237
238            // Send app message
239            let answer = match Box::pin(rpc_processor.rpc_call_app_call(dest, message)).await {
240                Ok(NetworkResult::Value(v)) => v,
241                Ok(NetworkResult::Timeout) => apibail_timeout!(),
242                Ok(NetworkResult::ServiceUnavailable(e)) => apibail_invalid_target!(e),
243                Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => {
244                    apibail_no_connection!(e);
245                }
246
247                Ok(NetworkResult::InvalidMessage(message)) => {
248                    apibail_generic!(message);
249                }
250                Err(e) => return Err(e.into()),
251            };
252
253            tracing::Span::current().record("ret.len", answer.answer.len());
254
255            Ok(answer.answer)
256        }).await.inspect_err(log_veilid_api_error!(self))
257    }
258
259    #[cfg(feature = "footgun")]
260    /// App-level bidirectional call that expects a response to be returned.
261    ///
262    /// Veilid apps may use this for arbitrary message passing.
263    ///
264    /// * `target` - can be either a direct node id or a private route.
265    /// * `message` - an arbitrary message blob of up to 32768 bytes.
266    ///
267    /// Returns an answer blob of up to 32768 bytes.
268    pub async fn app_call(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<Vec<u8>> {
269        self.internal_app_call(target, message).await
270    }
271
272    #[cfg(not(feature = "footgun"))]
273    /// App-level bidirectional call that expects a response to be returned.
274    ///
275    /// Veilid apps may use this for arbitrary message passing.
276    ///
277    /// * `target` - a private route id
278    /// * `message` - an arbitrary message blob of up to 32768 bytes.
279    ///
280    /// Returns an answer blob of up to 32768 bytes.
281    pub async fn app_call(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<Vec<u8>> {
282        match target {
283            Target::RouteId(_) => self.internal_app_call(target, message).await,
284            Target::NodeId(_) => Err(VeilidAPIError::invalid_target(
285                "Only RouteId targets are allowed without the footgun feature",
286            )),
287        }
288    }
289
290    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", skip(message), fields(duration, __VEILID_LOG_KEY = self.log_key(), message_len = message.len()), ret))]
291    async fn internal_app_message(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<()> {
292        record_duration_fut(async {
293            veilid_log!(self debug
294                "RoutingContext::app_message(self: {:?}, target: {:?}, message_len: {})", self, target, message.len());
295            veilid_log!(self trace "message: {:?}", message);
296
297            self.check_target(&target)?;
298
299            let rpc_processor = self.api.core_context()?.rpc_processor();
300
301            // Get destination
302            let dest = self.get_destination(target).await?;
303
304            // Send app message
305            match Box::pin(rpc_processor.rpc_call_app_message(dest, message)).await {
306                Ok(NetworkResult::Value(())) => {}
307                Ok(NetworkResult::Timeout) => apibail_timeout!(),
308                Ok(NetworkResult::ServiceUnavailable(e)) => apibail_invalid_target!(e),
309                Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => {
310                    apibail_no_connection!(e);
311                }
312                Ok(NetworkResult::InvalidMessage(message)) => {
313                    apibail_generic!(message);
314                }
315                Err(e) => return Err(e.into()),
316            };
317
318            Ok(())
319        }).await.inspect_err(log_veilid_api_error!(self))
320    }
321
322    #[cfg(feature = "footgun")]
323    /// App-level unidirectional message that does not expect any value to be returned.
324    ///
325    /// Veilid apps may use this for arbitrary message passing.
326    ///
327    /// * `target` - can be either a direct node id or a private route.
328    /// * `message` - an arbitrary message blob of up to 32768 bytes.
329    pub async fn app_message(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<()> {
330        self.internal_app_message(target, message).await
331    }
332
333    #[cfg(not(feature = "footgun"))]
334    /// App-level unidirectional message that does not expect any value to be returned.
335    ///
336    /// Veilid apps may use this for arbitrary message passing.
337    ///
338    /// * `target` - a private route.
339    /// * `message` - an arbitrary message blob of up to 32768 bytes.
340    pub async fn app_message(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<()> {
341        match target {
342            Target::RouteId(_) => self.internal_app_message(target, message).await,
343            Target::NodeId(_) => Err(VeilidAPIError::invalid_target(
344                "Only PrivateRoute targets are allowed without the footgun feature",
345            )),
346        }
347    }
348
349    ///////////////////////////////////
350    // DHT Records
351
352    /// Creates a new DHT record
353    ///
354    /// The record is considered 'open' after the create operation succeeds.
355    /// * 'kind' - specify a cryptosystem kind to use
356    /// * 'schema' - the schema to use when creating the DHT record
357    /// * 'owner' - optionally specify an owner keypair to use. If you leave this as None then a random one will be generated. If specified, the crypto kind of the owner must match that of the `kind` parameter
358    ///
359    /// Returns the newly allocated DHT record's key if successful.
360    /// Note: if you pass in an owner keypair this call is a deterministic! This means that if you try to create a new record for a given owner and schema that already exists it *will* fail.
361    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
362    pub async fn create_dht_record(
363        &self,
364        kind: CryptoKind,
365        schema: DHTSchema,
366        owner: Option<KeyPair>,
367    ) -> VeilidAPIResult<DHTRecordDescriptor> {
368        record_duration_fut(async {
369            veilid_log!(self debug
370                "RoutingContext::create_dht_record(self: {:?}, schema: {:?}, owner: {:?}, kind: {:?})", self, schema, owner, kind);
371            Crypto::validate_crypto_kind(kind)?;
372            schema.validate()?;
373            if let Some(owner) = &owner {
374                self.api.crypto()?.check_keypair(owner)?;
375            }
376
377            let storage_manager = self.api.core_context()?.storage_manager();
378            Box::pin(storage_manager.create_record(
379                kind,
380                schema,
381                owner,
382                self.unlocked_inner.safety_selection.clone(),
383            ))
384            .await
385        }).await.inspect_err(log_veilid_api_error!(self))
386    }
387
388    /// Opens a DHT record at a specific key.
389    ///
390    /// Associates a 'default_writer' secret if one is provided to provide writer capability. The
391    /// writer can be overridden if specified here via the set_dht_value writer.
392    ///
393    /// Records may only be opened or created. If a record is re-opened it will use the new writer and routing context
394    /// ignoring the settings of the last time it was opened. This allows one to open a record a second time
395    /// without first closing it, which will keep the active 'watches' on the record but change the default writer or
396    /// safety selection.
397    ///
398    /// Returns the DHT record descriptor for the opened record if successful.
399    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
400    pub async fn open_dht_record(
401        &self,
402        record_key: RecordKey,
403        default_writer: Option<KeyPair>,
404    ) -> VeilidAPIResult<DHTRecordDescriptor> {
405        record_duration_fut(async {
406            veilid_log!(self debug
407                "RoutingContext::open_dht_record(self: {:?}, key: {:?}, default_writer: {:?})", self, record_key, default_writer);
408
409            self.api
410                .core_context()?
411                .storage_manager()
412                .check_record_key(&record_key)?;
413            if let Some(default_writer) = &default_writer {
414                self.api.crypto()?.check_keypair(default_writer)?;
415            }
416
417            let storage_manager = self.api.core_context()?.storage_manager();
418            storage_manager
419                .open_record(
420                    record_key,
421                    default_writer,
422                    self.unlocked_inner.safety_selection.clone(),
423                )
424                .await
425        }).await.inspect_err(log_veilid_api_error!(self))
426    }
427
428    /// Closes a DHT record at a specific key that was opened with create_dht_record or open_dht_record.
429    ///
430    /// Closing a record allows you to re-open it with a different routing context.
431    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
432    pub async fn close_dht_record(&self, record_key: RecordKey) -> VeilidAPIResult<()> {
433        record_duration_fut(async {
434            veilid_log!(self debug
435                "RoutingContext::close_dht_record(self: {:?}, key: {:?})", self, record_key);
436
437            self.api
438                .core_context()?
439                .storage_manager()
440                .check_record_key(&record_key)?;
441
442            let storage_manager = self.api.core_context()?.storage_manager();
443            Box::pin(storage_manager.close_record(record_key)).await
444        })
445        .await
446        .inspect_err(log_veilid_api_error!(self))
447    }
448
449    /// Deletes a DHT record at a specific key.
450    ///
451    /// If the record is opened, it must be closed before it is deleted.
452    /// Deleting a record does not delete it from the network, but will remove the storage of the record
453    /// locally, and will prevent its value from being refreshed on the network by this node.
454    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
455    pub async fn delete_dht_record(&self, record_key: RecordKey) -> VeilidAPIResult<()> {
456        record_duration_fut(async {
457            veilid_log!(self debug
458                "RoutingContext::delete_dht_record(self: {:?}, key: {:?})", self, record_key);
459
460            self.api
461                .core_context()?
462                .storage_manager()
463                .check_record_key(&record_key)?;
464
465            let storage_manager = self.api.core_context()?.storage_manager();
466            Box::pin(storage_manager.delete_record(record_key)).await
467        })
468        .await
469        .inspect_err(log_veilid_api_error!(self))
470    }
471
472    /// Gets the latest value of a subkey.
473    ///
474    /// May pull the latest value from the network, but by setting 'force_refresh' you can force a network data refresh. Can only be used on opened records.
475    ///
476    /// Returns `None` if the value subkey has not yet been set.
477    /// Returns `Some(data)` if the value subkey has valid data.
478    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
479    pub async fn get_dht_value(
480        &self,
481        record_key: RecordKey,
482        subkey: ValueSubkey,
483        force_refresh: bool,
484    ) -> VeilidAPIResult<Option<ValueData>> {
485        record_duration_fut(async {
486            veilid_log!(self debug
487                "RoutingContext::get_dht_value(self: {:?}, key: {:?}, subkey: {:?}, force_refresh: {:?})", self, record_key, subkey, force_refresh);
488
489            self.api
490                .core_context()?
491                .storage_manager()
492                .check_record_key(&record_key)?;
493
494            let storage_manager = self.api.core_context()?.storage_manager();
495            Box::pin(storage_manager.get_value(record_key, subkey, force_refresh)).await
496        }).await.inspect_err(log_veilid_api_error!(self))
497    }
498
499    /// Pushes a changed subkey value to the network.
500    /// The DHT record must first by opened via open_dht_record or create_dht_record.
501    ///
502    /// The writer, if specified, will override the 'default_writer' specified when the record is opened.
503    ///
504    /// Returns `None` if the value was successfully set.
505    /// Returns `Some(data)` if the value set was older than the one available on the network.
506    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", skip(data), fields(duration, __VEILID_LOG_KEY = self.log_key(), data = print_data(&data, Some(64))), ret))]
507    pub async fn set_dht_value(
508        &self,
509        record_key: RecordKey,
510        subkey: ValueSubkey,
511        data: Vec<u8>,
512        options: Option<SetDHTValueOptions>,
513    ) -> VeilidAPIResult<Option<ValueData>> {
514        record_duration_fut(async {
515            veilid_log!(self debug
516                "RoutingContext::set_dht_value(self: {:?}, key: {:?}, subkey: {:?}, data: len={}, options: {:?})", self, record_key, subkey, data.len(), options);
517
518            self.api
519                .core_context()?
520                .storage_manager()
521                .check_record_key(&record_key)?;
522
523            let storage_manager = self.api.core_context()?.storage_manager();
524            Box::pin(storage_manager.set_value(record_key, subkey, data, options)).await
525        }).await.inspect_err(log_veilid_api_error!(self))
526    }
527
528    /// Add or update a watch to a DHT value that informs the user via an VeilidUpdate::ValueChange callback when the record has subkeys change.
529    /// One remote node will be selected to perform the watch and it will offer an expiration time based on a suggestion, and make an attempt to
530    /// continue to report changes via the callback. Nodes that agree to doing watches will be put on our 'ping' list to ensure they are still around
531    /// otherwise the watch will be cancelled and will have to be re-watched.  Can only be used on opened records.
532    ///
533    /// There is only one watch permitted per record. If a change to a watch is desired, the previous one will be overwritten.
534    /// * `key` is the record key to watch. it must first be opened for reading or writing.
535    /// * `subkeys`:
536    ///   - None: specifies watching the entire range of subkeys.
537    ///   - Some(range): is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys.
538    /// * `expiration`:
539    ///   - None: specifies a watch with no expiration
540    ///   - Some(timestamp): the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately.
541    /// * `count:
542    ///   - None: specifies a watch count of u32::MAX
543    ///   - Some(count): is the number of times the watch will be sent, maximum. A zero value here is equivalent to a cancellation.
544    ///
545    /// Returns Ok(true) if a watch is active for this record.
546    /// Returns Ok(false) if the entire watch has been cancelled.
547    ///
548    /// DHT watches are accepted with the following conditions:
549    /// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record.
550    /// * If a member (either the owner or a SMPL schema member) has opened the key for writing (even if no writing is performed) then the watch will be signed and guaranteed network.dht.member_watch_limit per writer.
551    ///
552    /// Members can be specified via the SMPL schema and do not need to allocate writable subkeys in order to offer a member watch capability.
553    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
554    pub async fn watch_dht_values(
555        &self,
556        record_key: RecordKey,
557        subkeys: Option<ValueSubkeyRangeSet>,
558        expiration: Option<Timestamp>,
559        count: Option<u32>,
560    ) -> VeilidAPIResult<bool> {
561        record_duration_fut(async {
562            veilid_log!(self debug
563                "RoutingContext::watch_dht_values(self: {:?}, key: {:?}, subkeys: {:?}, expiration: {:?}, count: {:?})", self, record_key, subkeys, expiration, count);
564            let subkeys = subkeys.unwrap_or_default();
565            let expiration = expiration.unwrap_or_default();
566            let count = count.unwrap_or(u32::MAX);
567
568            self.api
569                .core_context()?
570                .storage_manager()
571                .check_record_key(&record_key)?;
572
573            let storage_manager = self.api.core_context()?.storage_manager();
574            Box::pin(storage_manager.watch_values(record_key, subkeys, expiration, count)).await
575        }).await.inspect_err(log_veilid_api_error!(self))
576    }
577
578    /// Cancels a watch early.
579    ///
580    /// This is a convenience function that cancels watching all subkeys in a range. The subkeys specified here
581    /// are subtracted from the currently-watched subkey range.  Can only be used on opened records.
582    /// * `subkeys`:
583    ///   - None: specifies watching the entire range of subkeys.
584    ///   - Some(range): is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys.
585    ///
586    /// Only the subkey range is changed, the expiration and count remain the same.
587    /// If no subkeys remain, the watch is entirely cancelled and will receive no more updates.
588    ///
589    /// Returns Ok(true) if a watch is active for this record.
590    /// Returns Ok(false) if the entire watch has been cancelled.
591    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
592    pub async fn cancel_dht_watch(
593        &self,
594        record_key: RecordKey,
595        subkeys: Option<ValueSubkeyRangeSet>,
596    ) -> VeilidAPIResult<bool> {
597        record_duration_fut(async {
598            veilid_log!(self debug
599                "RoutingContext::cancel_dht_watch(self: {:?}, key: {:?}, subkeys: {:?}", self, record_key, subkeys);
600            let subkeys = subkeys.unwrap_or_default();
601
602            self.api
603                .core_context()?
604                .storage_manager()
605                .check_record_key(&record_key)?;
606
607            let storage_manager = self.api.core_context()?.storage_manager();
608            Box::pin(storage_manager.cancel_watch_values(record_key, subkeys)).await
609        }).await.inspect_err(log_veilid_api_error!(self))
610    }
611
612    /// Inspects a DHT record for subkey state.
613    /// This is useful for checking if you should push new subkeys to the network, or retrieve the current state of a record from the network
614    /// to see what needs updating locally. Can only be used on opened records.
615    ///
616    /// * `key` is the record key to inspect. it must first be opened for reading or writing.
617    /// * `subkeys`:
618    ///   - None: specifies inspecting the entire range of subkeys.
619    ///   - Some(range): is the the range of subkeys to inspect. The range must not exceed 512 discrete non-overlapping or adjacent subranges.
620    ///     If no range is specified, this is equivalent to watching the entire range of subkeys.
621    ///
622    /// * `scope` is what kind of range the inspection has:
623    ///   - DHTReportScope::Local`
624    ///     Results will be only for a locally stored record.
625    ///     Useful for seeing what subkeys you have locally and which ones have not been retrieved.
626    ///
627    ///   - `DHTReportScope::SyncGet`
628    ///     Return the local sequence numbers and the network sequence numbers with GetValue fanout parameters.
629    ///     Provides an independent view of both the local sequence numbers and the network sequence numbers for nodes that
630    ///     would be reached as if the local copy did not exist locally.
631    ///     Useful for determining if the current local copy should be updated from the network.
632    ///
633    ///   - `DHTReportScope::SyncSet`
634    ///     Return the local sequence numbers and the network sequence numbers with SetValue fanout parameters.
635    ///     Provides an independent view of both the local sequence numbers and the network sequence numbers for nodes that
636    ///     would be reached as if the local copy did not exist locally.
637    ///     Useful for determining if the unchanged local copy should be pushed to the network.
638    ///
639    ///   - `DHTReportScope::UpdateGet`
640    ///     Return the local sequence numbers and the network sequence numbers with GetValue fanout parameters.
641    ///     Provides an view of both the local sequence numbers and the network sequence numbers for nodes that
642    ///     would be reached as if a GetValue operation were being performed, including accepting newer values from the network.
643    ///     Useful for determining which subkeys would change with a GetValue operation.
644    ///
645    ///   - `DHTReportScope::UpdateSet`
646    ///     Return the local sequence numbers and the network sequence numbers with SetValue fanout parameters.
647    ///     Provides an view of both the local sequence numbers and the network sequence numbers for nodes that
648    ///     would be reached as if a SetValue operation were being performed, including accepting newer values from the network.
649    ///     This simulates a SetValue with the initial sequence number incremented by 1, like a real SetValue would when updating.
650    ///     Useful for determine which subkeys would change with an SetValue operation.
651    ///
652    /// Returns `Ok(DHTRecordReport)` with the subkey ranges that were returned that overlapped the schema, and sequence numbers for each of the subkeys in the range.
653    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
654    pub async fn inspect_dht_record(
655        &self,
656        record_key: RecordKey,
657        subkeys: Option<ValueSubkeyRangeSet>,
658        scope: DHTReportScope,
659    ) -> VeilidAPIResult<DHTRecordReport> {
660        record_duration_fut(async {
661            veilid_log!(self debug
662                "RoutingContext::inspect_dht_record(self: {:?}, record_key: {:?}, subkeys: {:?}, scope: {:?})", self, record_key, subkeys, scope);
663            let subkeys = subkeys.unwrap_or_default();
664
665            self.api
666                .core_context()?
667                .storage_manager()
668                .check_record_key(&record_key)?;
669
670            let storage_manager = self.api.core_context()?.storage_manager();
671            Box::pin(storage_manager.inspect_record(record_key, subkeys, scope)).await
672        }).await.inspect_err(log_veilid_api_error!(self))
673    }
674
675    ///////////////////////////////////
676    // Block Store
677
678    #[cfg(feature = "unstable-blockstore")]
679    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
680    pub async fn find_block(&self, _block_id: BlockId) -> VeilidAPIResult<Vec<u8>> {
681        panic!("unimplemented");
682    }
683
684    #[cfg(feature = "unstable-blockstore")]
685    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret,))]
686    pub async fn supply_block(&self, _block_id: BlockId) -> VeilidAPIResult<bool> {
687        panic!("unimplemented");
688    }
689}