1use super::*;
2
3impl_veilid_log_facility!("veilid_api");
4
5#[derive(
9 Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema,
10)]
11#[cfg_attr(
12 all(target_arch = "wasm32", target_os = "unknown"),
13 derive(Tsify),
14 tsify(from_wasm_abi, into_wasm_abi, namespace)
15)]
16#[must_use]
17pub enum Target {
18 #[schemars(with = "String")]
20 NodeId(NodeId),
21 #[schemars(with = "String")]
23 RouteId(RouteId),
24}
25
26pub(crate) struct RoutingContextUnlockedInner {
27 safety_selection: SafetySelection,
29}
30
31#[derive(Clone)]
37#[must_use]
38pub struct RoutingContext {
39 api: VeilidAPI,
41 unlocked_inner: Arc<RoutingContextUnlockedInner>,
42}
43
44impl fmt::Debug for RoutingContext {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 f.debug_struct("RoutingContext")
47 .field("ptr", &format!("{:p}", Arc::as_ptr(&self.unlocked_inner)))
48 .field("safety_selection", &self.unlocked_inner.safety_selection)
49 .finish()
50 }
51}
52
53impl RoutingContext {
54 pub(super) fn try_new(api: VeilidAPI) -> VeilidAPIResult<Self> {
57 let config = api.config()?;
58
59 Ok(Self {
60 api,
61 unlocked_inner: Arc::new(RoutingContextUnlockedInner {
62 safety_selection: SafetySelection::Safe(SafetySpec {
63 preferred_route: None,
64 hop_count: config.network.rpc.default_route_hop_count as usize,
65 stability: Stability::Reliable,
66 sequencing: Sequencing::PreferOrdered,
67 }),
68 }),
69 })
70 }
71
72 #[must_use]
73 pub(crate) fn log_key(&self) -> &str {
74 self.api.log_key()
75 }
76
77 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
88 pub fn with_default_safety(self) -> VeilidAPIResult<Self> {
89 let this = self.clone();
90 record_duration(|| {
91 veilid_log!(self debug
92 "RoutingContext::with_default_safety(self: {:?})", self);
93
94 let config = self.api.config()?;
95
96 self.with_safety(SafetySelection::Safe(SafetySpec {
97 preferred_route: None,
98 hop_count: config.network.rpc.default_route_hop_count as usize,
99 stability: Stability::Reliable,
100 sequencing: Sequencing::PreferOrdered,
101 }))
102 })
103 .inspect_err(log_veilid_api_error!(this))
104 }
105
106 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
108 pub fn with_safety(self, safety_selection: SafetySelection) -> VeilidAPIResult<Self> {
109 let this = self.clone();
110 record_duration(|| {
111 veilid_log!(self debug
112 "RoutingContext::with_safety(self: {:?}, safety_selection: {:?})", self, safety_selection);
113
114 if let SafetySelection::Unsafe(_) = &safety_selection {
115 #[cfg(not(feature = "footgun"))]
116 {
117 return Err(VeilidAPIError::generic(
118 "Unsafe routing mode is not allowed without the 'footgun' feature enabled",
119 ));
120 }
121 }
122
123 if let SafetySelection::Safe(safe) = &safety_selection {
124 if let Some(preferred_route) = &safe.preferred_route {
125 self.api
126 .core_context()?
127 .routing_table()
128 .check_route_id(preferred_route)?;
129 }
130 }
131
132 Ok(Self {
133 api: self.api.clone(),
134 unlocked_inner: Arc::new(RoutingContextUnlockedInner { safety_selection }),
135 })
136 }).inspect_err(log_veilid_api_error!(this))
137 }
138
139 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
141 pub fn with_sequencing(self, sequencing: Sequencing) -> Self {
142 record_duration(|| {
143 veilid_log!(self debug
144 "RoutingContext::with_sequencing(self: {:?}, sequencing: {:?})", self, sequencing);
145
146 Self {
147 api: self.api.clone(),
148 unlocked_inner: Arc::new(RoutingContextUnlockedInner {
149 safety_selection: match &self.unlocked_inner.safety_selection {
150 SafetySelection::Unsafe(_) => SafetySelection::Unsafe(sequencing),
151 SafetySelection::Safe(safety_spec) => SafetySelection::Safe(SafetySpec {
152 preferred_route: safety_spec.preferred_route.clone(),
153 hop_count: safety_spec.hop_count,
154 stability: safety_spec.stability,
155 sequencing,
156 }),
157 },
158 }),
159 }
160 })
161 }
162
163 pub fn safety(&self) -> SafetySelection {
165 self.unlocked_inner.safety_selection.clone()
166 }
167
168 pub fn sequencing(&self) -> Sequencing {
170 match &self.unlocked_inner.safety_selection {
171 SafetySelection::Unsafe(sequencing) => *sequencing,
172 SafetySelection::Safe(safety_spec) => safety_spec.sequencing,
173 }
174 }
175
176 pub fn api(&self) -> VeilidAPI {
178 self.api.clone()
179 }
180
181 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
182 async fn get_destination(&self, target: Target) -> VeilidAPIResult<rpc_processor::Destination> {
183 record_duration_fut(async {
184 veilid_log!(self debug
185 "RoutingContext::get_destination(self: {:?}, target: {:?})", self, target);
186
187 let rpc_processor = self.api.core_context()?.rpc_processor();
188 Box::pin(rpc_processor.resolve_target_to_destination(
189 target,
190 self.unlocked_inner.safety_selection.clone(),
191 ))
192 .await
193 .map_err(VeilidAPIError::invalid_target)
194 })
195 .await
196 .inspect_err(log_veilid_api_error!(self))
197 }
198
199 fn check_target(&self, target: &Target) -> VeilidAPIResult<()> {
200 match target {
201 Target::NodeId(node_id) => {
202 self.api
203 .core_context()?
204 .routing_table()
205 .check_node_id(node_id)?;
206 }
207 Target::RouteId(route_id) => {
208 self.api
209 .core_context()?
210 .routing_table()
211 .check_route_id(route_id)?;
212 }
213 }
214 Ok(())
215 }
216
217 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", skip(message), fields(duration, __VEILID_LOG_KEY = self.log_key(), message_len = message.len(), ret.len)))]
221 async fn internal_app_call(
222 &self,
223 target: Target,
224 message: Vec<u8>,
225 ) -> VeilidAPIResult<Vec<u8>> {
226 record_duration_fut(async {
227 veilid_log!(self debug
228 "RoutingContext::app_call(self: {:?}, target: {:?}, message_len: {:?})", self, target, message.len());
229 veilid_log!(self trace "message: {:?}", message);
230
231 self.check_target(&target)?;
232
233 let rpc_processor = self.api.core_context()?.rpc_processor();
234
235 let dest = self.get_destination(target).await?;
237
238 let answer = match Box::pin(rpc_processor.rpc_call_app_call(dest, message)).await {
240 Ok(NetworkResult::Value(v)) => v,
241 Ok(NetworkResult::Timeout) => apibail_timeout!(),
242 Ok(NetworkResult::ServiceUnavailable(e)) => apibail_invalid_target!(e),
243 Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => {
244 apibail_no_connection!(e);
245 }
246
247 Ok(NetworkResult::InvalidMessage(message)) => {
248 apibail_generic!(message);
249 }
250 Err(e) => return Err(e.into()),
251 };
252
253 tracing::Span::current().record("ret.len", answer.answer.len());
254
255 Ok(answer.answer)
256 }).await.inspect_err(log_veilid_api_error!(self))
257 }
258
259 #[cfg(feature = "footgun")]
260 pub async fn app_call(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<Vec<u8>> {
269 self.internal_app_call(target, message).await
270 }
271
272 #[cfg(not(feature = "footgun"))]
273 pub async fn app_call(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<Vec<u8>> {
282 match target {
283 Target::RouteId(_) => self.internal_app_call(target, message).await,
284 Target::NodeId(_) => Err(VeilidAPIError::invalid_target(
285 "Only RouteId targets are allowed without the footgun feature",
286 )),
287 }
288 }
289
290 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", skip(message), fields(duration, __VEILID_LOG_KEY = self.log_key(), message_len = message.len()), ret))]
291 async fn internal_app_message(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<()> {
292 record_duration_fut(async {
293 veilid_log!(self debug
294 "RoutingContext::app_message(self: {:?}, target: {:?}, message_len: {})", self, target, message.len());
295 veilid_log!(self trace "message: {:?}", message);
296
297 self.check_target(&target)?;
298
299 let rpc_processor = self.api.core_context()?.rpc_processor();
300
301 let dest = self.get_destination(target).await?;
303
304 match Box::pin(rpc_processor.rpc_call_app_message(dest, message)).await {
306 Ok(NetworkResult::Value(())) => {}
307 Ok(NetworkResult::Timeout) => apibail_timeout!(),
308 Ok(NetworkResult::ServiceUnavailable(e)) => apibail_invalid_target!(e),
309 Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => {
310 apibail_no_connection!(e);
311 }
312 Ok(NetworkResult::InvalidMessage(message)) => {
313 apibail_generic!(message);
314 }
315 Err(e) => return Err(e.into()),
316 };
317
318 Ok(())
319 }).await.inspect_err(log_veilid_api_error!(self))
320 }
321
322 #[cfg(feature = "footgun")]
323 pub async fn app_message(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<()> {
330 self.internal_app_message(target, message).await
331 }
332
333 #[cfg(not(feature = "footgun"))]
334 pub async fn app_message(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<()> {
341 match target {
342 Target::RouteId(_) => self.internal_app_message(target, message).await,
343 Target::NodeId(_) => Err(VeilidAPIError::invalid_target(
344 "Only PrivateRoute targets are allowed without the footgun feature",
345 )),
346 }
347 }
348
349 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
362 pub async fn create_dht_record(
363 &self,
364 kind: CryptoKind,
365 schema: DHTSchema,
366 owner: Option<KeyPair>,
367 ) -> VeilidAPIResult<DHTRecordDescriptor> {
368 record_duration_fut(async {
369 veilid_log!(self debug
370 "RoutingContext::create_dht_record(self: {:?}, schema: {:?}, owner: {:?}, kind: {:?})", self, schema, owner, kind);
371 Crypto::validate_crypto_kind(kind)?;
372 schema.validate()?;
373 if let Some(owner) = &owner {
374 self.api.crypto()?.check_keypair(owner)?;
375 }
376
377 let storage_manager = self.api.core_context()?.storage_manager();
378 Box::pin(storage_manager.create_record(
379 kind,
380 schema,
381 owner,
382 self.unlocked_inner.safety_selection.clone(),
383 ))
384 .await
385 }).await.inspect_err(log_veilid_api_error!(self))
386 }
387
388 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
400 pub async fn open_dht_record(
401 &self,
402 record_key: RecordKey,
403 default_writer: Option<KeyPair>,
404 ) -> VeilidAPIResult<DHTRecordDescriptor> {
405 record_duration_fut(async {
406 veilid_log!(self debug
407 "RoutingContext::open_dht_record(self: {:?}, key: {:?}, default_writer: {:?})", self, record_key, default_writer);
408
409 self.api
410 .core_context()?
411 .storage_manager()
412 .check_record_key(&record_key)?;
413 if let Some(default_writer) = &default_writer {
414 self.api.crypto()?.check_keypair(default_writer)?;
415 }
416
417 let storage_manager = self.api.core_context()?.storage_manager();
418 storage_manager
419 .open_record(
420 record_key,
421 default_writer,
422 self.unlocked_inner.safety_selection.clone(),
423 )
424 .await
425 }).await.inspect_err(log_veilid_api_error!(self))
426 }
427
428 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
432 pub async fn close_dht_record(&self, record_key: RecordKey) -> VeilidAPIResult<()> {
433 record_duration_fut(async {
434 veilid_log!(self debug
435 "RoutingContext::close_dht_record(self: {:?}, key: {:?})", self, record_key);
436
437 self.api
438 .core_context()?
439 .storage_manager()
440 .check_record_key(&record_key)?;
441
442 let storage_manager = self.api.core_context()?.storage_manager();
443 Box::pin(storage_manager.close_record(record_key)).await
444 })
445 .await
446 .inspect_err(log_veilid_api_error!(self))
447 }
448
449 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
455 pub async fn delete_dht_record(&self, record_key: RecordKey) -> VeilidAPIResult<()> {
456 record_duration_fut(async {
457 veilid_log!(self debug
458 "RoutingContext::delete_dht_record(self: {:?}, key: {:?})", self, record_key);
459
460 self.api
461 .core_context()?
462 .storage_manager()
463 .check_record_key(&record_key)?;
464
465 let storage_manager = self.api.core_context()?.storage_manager();
466 Box::pin(storage_manager.delete_record(record_key)).await
467 })
468 .await
469 .inspect_err(log_veilid_api_error!(self))
470 }
471
472 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
479 pub async fn get_dht_value(
480 &self,
481 record_key: RecordKey,
482 subkey: ValueSubkey,
483 force_refresh: bool,
484 ) -> VeilidAPIResult<Option<ValueData>> {
485 record_duration_fut(async {
486 veilid_log!(self debug
487 "RoutingContext::get_dht_value(self: {:?}, key: {:?}, subkey: {:?}, force_refresh: {:?})", self, record_key, subkey, force_refresh);
488
489 self.api
490 .core_context()?
491 .storage_manager()
492 .check_record_key(&record_key)?;
493
494 let storage_manager = self.api.core_context()?.storage_manager();
495 Box::pin(storage_manager.get_value(record_key, subkey, force_refresh)).await
496 }).await.inspect_err(log_veilid_api_error!(self))
497 }
498
499 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", skip(data), fields(duration, __VEILID_LOG_KEY = self.log_key(), data = print_data(&data, Some(64))), ret))]
507 pub async fn set_dht_value(
508 &self,
509 record_key: RecordKey,
510 subkey: ValueSubkey,
511 data: Vec<u8>,
512 options: Option<SetDHTValueOptions>,
513 ) -> VeilidAPIResult<Option<ValueData>> {
514 record_duration_fut(async {
515 veilid_log!(self debug
516 "RoutingContext::set_dht_value(self: {:?}, key: {:?}, subkey: {:?}, data: len={}, options: {:?})", self, record_key, subkey, data.len(), options);
517
518 self.api
519 .core_context()?
520 .storage_manager()
521 .check_record_key(&record_key)?;
522
523 let storage_manager = self.api.core_context()?.storage_manager();
524 Box::pin(storage_manager.set_value(record_key, subkey, data, options)).await
525 }).await.inspect_err(log_veilid_api_error!(self))
526 }
527
528 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
554 pub async fn watch_dht_values(
555 &self,
556 record_key: RecordKey,
557 subkeys: Option<ValueSubkeyRangeSet>,
558 expiration: Option<Timestamp>,
559 count: Option<u32>,
560 ) -> VeilidAPIResult<bool> {
561 record_duration_fut(async {
562 veilid_log!(self debug
563 "RoutingContext::watch_dht_values(self: {:?}, key: {:?}, subkeys: {:?}, expiration: {:?}, count: {:?})", self, record_key, subkeys, expiration, count);
564 let subkeys = subkeys.unwrap_or_default();
565 let expiration = expiration.unwrap_or_default();
566 let count = count.unwrap_or(u32::MAX);
567
568 self.api
569 .core_context()?
570 .storage_manager()
571 .check_record_key(&record_key)?;
572
573 let storage_manager = self.api.core_context()?.storage_manager();
574 Box::pin(storage_manager.watch_values(record_key, subkeys, expiration, count)).await
575 }).await.inspect_err(log_veilid_api_error!(self))
576 }
577
578 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
592 pub async fn cancel_dht_watch(
593 &self,
594 record_key: RecordKey,
595 subkeys: Option<ValueSubkeyRangeSet>,
596 ) -> VeilidAPIResult<bool> {
597 record_duration_fut(async {
598 veilid_log!(self debug
599 "RoutingContext::cancel_dht_watch(self: {:?}, key: {:?}, subkeys: {:?}", self, record_key, subkeys);
600 let subkeys = subkeys.unwrap_or_default();
601
602 self.api
603 .core_context()?
604 .storage_manager()
605 .check_record_key(&record_key)?;
606
607 let storage_manager = self.api.core_context()?.storage_manager();
608 Box::pin(storage_manager.cancel_watch_values(record_key, subkeys)).await
609 }).await.inspect_err(log_veilid_api_error!(self))
610 }
611
612 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
654 pub async fn inspect_dht_record(
655 &self,
656 record_key: RecordKey,
657 subkeys: Option<ValueSubkeyRangeSet>,
658 scope: DHTReportScope,
659 ) -> VeilidAPIResult<DHTRecordReport> {
660 record_duration_fut(async {
661 veilid_log!(self debug
662 "RoutingContext::inspect_dht_record(self: {:?}, record_key: {:?}, subkeys: {:?}, scope: {:?})", self, record_key, subkeys, scope);
663 let subkeys = subkeys.unwrap_or_default();
664
665 self.api
666 .core_context()?
667 .storage_manager()
668 .check_record_key(&record_key)?;
669
670 let storage_manager = self.api.core_context()?.storage_manager();
671 Box::pin(storage_manager.inspect_record(record_key, subkeys, scope)).await
672 }).await.inspect_err(log_veilid_api_error!(self))
673 }
674
675 #[cfg(feature = "unstable-blockstore")]
679 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret))]
680 pub async fn find_block(&self, _block_id: BlockId) -> VeilidAPIResult<Vec<u8>> {
681 panic!("unimplemented");
682 }
683
684 #[cfg(feature = "unstable-blockstore")]
685 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), ret,))]
686 pub async fn supply_block(&self, _block_id: BlockId) -> VeilidAPIResult<bool> {
687 panic!("unimplemented");
688 }
689}