hyveos_sdk/connection.rs
1use std::{env, path::PathBuf, sync::Arc};
2
3use hyper_util::rt::TokioIo;
4use hyveos_core::BRIDGE_SOCKET_ENV_VAR;
5#[cfg(feature = "serde")]
6use serde::{de::DeserializeOwned, Serialize};
7use tokio::net::UnixStream;
8use tonic::transport::{Channel, Endpoint, Uri};
9use tower::service_fn;
10
11#[cfg(feature = "cbor")]
12use crate::services::CborReqRespService;
13#[cfg(feature = "json")]
14use crate::services::JsonReqRespService;
15#[cfg(feature = "scripting")]
16use crate::services::ScriptingService;
17use crate::{
18 error::{Error, Result},
19 services::{
20 DbService, DebugService, DhtService, DiscoveryService, FileTransferService,
21 GossipSubService, ReqRespService,
22 },
23};
24
25mod internal {
26 use std::future::Future;
27
28 use super::Connection;
29 use crate::error::Result;
30
31 pub trait ConnectionType {
32 // We can promise `Send` here, so let's do it.
33 fn connect(self) -> impl Future<Output = Result<Connection>> + Send;
34 }
35}
36
37pub trait ConnectionType: internal::ConnectionType {}
38
39impl<T: internal::ConnectionType> ConnectionType for T {}
40
41/// A connection to the HyveOS runtime through the scripting bridge.
42///
43/// The Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable
44/// ([`hyveos_core::BRIDGE_SOCKET_ENV_VAR`]) will be used to communicate with the runtime.
45///
46/// This is the standard connection type when used in a HyveOS script.
47#[derive(Debug, Clone)]
48pub struct BridgeConnection {
49 _private: (),
50}
51
52impl internal::ConnectionType for BridgeConnection {
53 async fn connect(self) -> Result<Connection> {
54 let channel = Endpoint::try_from("http://[::]:50051")?
55 .connect_with_connector(service_fn(|_: Uri| async {
56 let path = env::var(BRIDGE_SOCKET_ENV_VAR)
57 .map_err(|e| Error::EnvVarMissing(BRIDGE_SOCKET_ENV_VAR, e))?;
58
59 UnixStream::connect(path)
60 .await
61 .map_err(Error::from)
62 .map(TokioIo::new)
63 }))
64 .await?;
65
66 Ok(Connection {
67 channel,
68 #[cfg(feature = "network")]
69 reqwest_client_and_url: None,
70 shared_dir_path: None,
71 })
72 }
73}
74
75/// A connection to the HyveOS runtime through a custom Unix domain socket.
76#[derive(Debug, Clone)]
77pub struct CustomConnection {
78 socket_path: PathBuf,
79 shared_dir_path: PathBuf,
80}
81
82impl internal::ConnectionType for CustomConnection {
83 async fn connect(self) -> Result<Connection> {
84 let socket_path = Arc::new(self.socket_path);
85 let channel = Endpoint::try_from("http://[::]:50051")?
86 .connect_with_connector(service_fn(move |_: Uri| {
87 let socket_path = socket_path.clone();
88 async move {
89 UnixStream::connect(socket_path.as_path())
90 .await
91 .map_err(Error::from)
92 .map(TokioIo::new)
93 }
94 }))
95 .await?;
96
97 Ok(Connection {
98 channel,
99 #[cfg(feature = "network")]
100 reqwest_client_and_url: None,
101 shared_dir_path: Some(Arc::new(self.shared_dir_path)),
102 })
103 }
104}
105
106/// A connection over the network to a HyveOS runtime listening at a given URI.
107#[cfg(feature = "network")]
108#[derive(Debug, Clone)]
109pub struct UriConnection {
110 uri: Uri,
111}
112
113#[cfg(feature = "network")]
114impl internal::ConnectionType for UriConnection {
115 async fn connect(self) -> Result<Connection> {
116 let (url, if_name) = uri_to_url_and_if_name(self.uri.clone())?;
117 let channel = Endpoint::from(self.uri).connect().await?;
118
119 #[cfg_attr(
120 not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")),
121 expect(unused_mut)
122 )]
123 let mut client_builder = reqwest::Client::builder();
124
125 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
126 if let Some(if_name) = if_name {
127 client_builder = client_builder.interface(&if_name);
128 }
129 #[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))]
130 assert!(
131 if_name.is_none(),
132 "Interface name in URI is only supported on Android, Fuchsia, and Linux"
133 );
134
135 let client = client_builder.build()?;
136
137 Ok(Connection {
138 channel,
139 reqwest_client_and_url: Some((client, url)),
140 shared_dir_path: None,
141 })
142 }
143}
144
145#[cfg(feature = "network")]
146fn uri_to_url_and_if_name(uri: Uri) -> Result<(reqwest::Url, Option<String>)> {
147 let mut parts = uri.into_parts();
148 let mut if_name = None;
149 if let Some(authority) = &parts.authority {
150 let authority = authority.as_str();
151
152 if let Some(ipv6_start) = authority.find('[') {
153 if let Some(start) = authority[ipv6_start..].find('%') {
154 if let Some(end) = authority[start..].find(']') {
155 let zone = &authority[start + 1..end];
156 let name = zone
157 .parse()
158 .ok()
159 .and_then(|index| hyveos_ifaddr::if_index_to_name(index).ok())
160 .unwrap_or_else(|| zone.to_string());
161 if_name = Some(name);
162 let mut authority = authority.to_string();
163 authority.replace_range(start..end, "");
164 parts.authority = Some(authority.parse()?);
165 }
166 }
167 }
168 }
169
170 Ok((Uri::try_from(parts)?.to_string().parse()?, if_name))
171}
172
173/// A builder for configuring a connection to the HyveOS runtime.
174#[derive(Debug, Clone)]
175pub struct ConnectionBuilder<T> {
176 connection_type: T,
177}
178
179impl Default for ConnectionBuilder<BridgeConnection> {
180 fn default() -> Self {
181 Self::new()
182 }
183}
184
185impl ConnectionBuilder<BridgeConnection> {
186 /// Creates a new builder for configuring a connection to the HyveOS runtime.
187 ///
188 /// By default, the connection to the HyveOS runtime will be made through the scripting bridge,
189 /// i.e., the Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable
190 /// ([`hyveos_core::BRIDGE_SOCKET_ENV_VAR`]) will be used to communicate with the runtime.
191 /// If another connection type is desired, use the [`Self::custom`] or [`Self::uri`] methods.
192 #[must_use]
193 pub fn new() -> Self {
194 Self {
195 connection_type: BridgeConnection { _private: () },
196 }
197 }
198
199 /// Specifies a custom Unix domain socket to connect to.
200 ///
201 /// The socket path should point to a Unix domain socket that the HyveOS runtime is listening on.
202 /// The shared directory path should point to the shared directory that the HyveOS runtime is using.
203 ///
204 /// # Example
205 ///
206 /// ```no_run
207 /// use hyveos_sdk::Connection;
208 ///
209 /// # #[tokio::main]
210 /// # async fn main() {
211 /// let connection = Connection::builder()
212 /// .custom("/path/to/hyveos.sock", "/path/to/shared/dir")
213 /// .connect()
214 /// .await
215 /// .unwrap();
216 /// let mut discovery_service = connection.discovery();
217 /// let peer_id = discovery_service.get_own_id().await.unwrap();
218 ///
219 /// println!("My peer id: {peer_id}");
220 /// # }
221 /// ```
222 pub fn custom(
223 self,
224 socket_path: impl Into<PathBuf>,
225 shared_dir_path: impl Into<PathBuf>,
226 ) -> ConnectionBuilder<CustomConnection> {
227 ConnectionBuilder {
228 connection_type: CustomConnection {
229 socket_path: socket_path.into(),
230 shared_dir_path: shared_dir_path.into(),
231 },
232 }
233 }
234
235 /// Specifies a URI to connect to over the network.
236 ///
237 /// The URI should be in the format `http://<host>:<port>`.
238 /// A HyveOS runtime should be listening at the given address.
239 ///
240 /// > **Note**: If the provided URI's path is not just `/` (e.g. `http://example.com:12345/foo/bar/`),
241 /// > make sure that it ends with a slash!
242 ///
243 /// # Example
244 ///
245 /// ```no_run
246 /// use hyveos_sdk::{Connection, Uri};
247 ///
248 /// # #[tokio::main]
249 /// # async fn main() {
250 /// let uri = Uri::from_static("http://[::1]:50051");
251 /// let connection = Connection::builder()
252 /// .uri(uri)
253 /// .connect()
254 /// .await
255 /// .unwrap();
256 /// let mut discovery_service = connection.discovery();
257 /// let peer_id = discovery_service.get_own_id().await.unwrap();
258 ///
259 /// println!("My peer id: {peer_id}");
260 /// # }
261 /// ```
262 #[cfg(feature = "network")]
263 pub fn uri(self, uri: Uri) -> ConnectionBuilder<UriConnection> {
264 ConnectionBuilder {
265 connection_type: UriConnection { uri },
266 }
267 }
268}
269
270impl<T: ConnectionType> ConnectionBuilder<T> {
271 /// Establishes a connection to the HyveOS runtime.
272 ///
273 /// # Errors
274 ///
275 /// Returns an error if the connection could not be established.
276 ///
277 /// # Example
278 ///
279 /// ```no_run
280 /// use hyveos_sdk::Connection;
281 ///
282 /// # #[tokio::main]
283 /// # async fn main() {
284 /// let connection = Connection::builder()
285 /// .custom("/path/to/hyveos.sock", "/path/to/shared/dir")
286 /// .connect()
287 /// .await
288 /// .unwrap();
289 /// let mut discovery_service = connection.discovery();
290 /// let peer_id = discovery_service.get_own_id().await.unwrap();
291 ///
292 /// println!("My peer id: {peer_id}");
293 /// # }
294 /// ```
295 pub async fn connect(self) -> Result<Connection> {
296 self.connection_type.connect().await
297 }
298}
299
300/// A connection to the HyveOS runtime.
301///
302/// This struct provides access to the various services provided by HyveOS.
303///
304/// By default, the connection to the HyveOS runtime will be made through the scripting bridge,
305/// i.e., the Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable
306/// ([`hyveos_core::BRIDGE_SOCKET_ENV_VAR`]) will be used to communicate with the runtime.
307/// If another connection type is desired, use the [`Self::builder`] function to get a
308/// [`ConnectionBuilder`] and use the [`ConnectionBuilder::custom`] or
309/// [`ConnectionBuilder::uri`] methods.
310///
311/// # Example
312///
313/// ```no_run
314/// use hyveos_sdk::Connection;
315///
316/// # #[tokio::main]
317/// # async fn main() {
318/// let connection = Connection::new().await.unwrap();
319/// let mut discovery_service = connection.discovery();
320/// let peer_id = discovery_service.get_own_id().await.unwrap();
321///
322/// println!("My peer id: {peer_id}");
323/// # }
324/// ```
325pub struct Connection {
326 pub(crate) channel: Channel,
327 #[cfg(feature = "network")]
328 pub(crate) reqwest_client_and_url: Option<(reqwest::Client, reqwest::Url)>,
329 pub(crate) shared_dir_path: Option<Arc<PathBuf>>,
330}
331
332impl Connection {
333 /// Establishes a connection to the HyveOS runtime through the scripting bridge.
334 ///
335 /// The Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable
336 /// ([`hyveos_core::BRIDGE_SOCKET_ENV_VAR`]) will be used to communicate with the runtime.
337 /// If another connection type is desired, use the [`Self::builder`] function to get a
338 /// [`ConnectionBuilder`] and use the [`ConnectionBuilder::custom`] or
339 /// [`ConnectionBuilder::uri`] methods.
340 ///
341 /// # Errors
342 ///
343 /// Returns an error if the connection could not be established.
344 ///
345 /// # Example
346 ///
347 /// ```no_run
348 /// use hyveos_sdk::Connection;
349 ///
350 /// # #[tokio::main]
351 /// # async fn main() {
352 /// let connection = Connection::new().await.unwrap();
353 /// let mut discovery_service = connection.discovery();
354 /// let peer_id = discovery_service.get_own_id().await.unwrap();
355 ///
356 /// println!("My peer id: {peer_id}");
357 /// # }
358 /// ```
359 pub async fn new() -> Result<Self> {
360 Connection::builder().connect().await
361 }
362
363 /// Creates a new builder for configuring a connection to the HyveOS runtime.
364 ///
365 /// By default, the connection to the HyveOS runtime will be made through the scripting bridge,
366 /// i.e., the Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable
367 /// ([`hyveos_core::BRIDGE_SOCKET_ENV_VAR`]) will be used to communicate with the runtime.
368 /// If another connection type is desired, use the [`ConnectionBuilder::custom`] or
369 /// [`ConnectionBuilder::uri`] methods.
370 ///
371 /// # Example
372 ///
373 /// ```no_run
374 /// use hyveos_sdk::Connection;
375 ///
376 /// # #[tokio::main]
377 /// # async fn main() {
378 /// let connection = Connection::builder()
379 /// .custom("/path/to/hyveos.sock", "/path/to/shared/dir")
380 /// .connect()
381 /// .await
382 /// .unwrap();
383 /// let mut discovery_service = connection.discovery();
384 /// let peer_id = discovery_service.get_own_id().await.unwrap();
385 ///
386 /// println!("My peer id: {peer_id}");
387 /// # }
388 /// ```
389 #[must_use]
390 pub fn builder() -> ConnectionBuilder<BridgeConnection> {
391 ConnectionBuilder::new()
392 }
393
394 /// Returns a handle to the database service.
395 ///
396 /// # Example
397 ///
398 /// ```no_run
399 /// use hyveos_sdk::Connection;
400 ///
401 /// # #[tokio::main]
402 /// # async fn main() {
403 /// let connection = Connection::new().await.unwrap();
404 /// let mut db_service = connection.db();
405 /// assert!(db_service.put("key", b"value").await.unwrap().is_none());
406 ///
407 /// let value = db_service.get("key").await.unwrap().unwrap();
408 /// assert_eq!(value, b"value");
409 /// # }
410 /// ```
411 #[must_use]
412 pub fn db(&self) -> DbService {
413 DbService::new(self)
414 }
415
416 /// Returns a handle to the debug service.
417 ///
418 /// # Example
419 ///
420 /// ```no_run
421 /// use futures::TryStreamExt as _;
422 /// use hyveos_sdk::Connection;
423 ///
424 /// # #[tokio::main]
425 /// # async fn main() {
426 /// let connection = Connection::new().await.unwrap();
427 /// let mut debug_service = connection.debug();
428 /// let mut events = debug_service.subscribe_mesh_topology().await.unwrap();
429 ///
430 /// while let Some(event) = events.try_next().await.unwrap() {
431 /// println!("{event:?}");
432 /// }
433 /// # }
434 /// ```
435 #[must_use]
436 pub fn debug(&self) -> DebugService {
437 DebugService::new(self)
438 }
439
440 /// Returns a handle to the DHT service.
441 ///
442 /// # Example
443 ///
444 /// ```no_run
445 /// use hyveos_sdk::Connection;
446 ///
447 /// # #[tokio::main]
448 /// # async fn main() {
449 /// let connection = Connection::new().await.unwrap();
450 /// let mut dht_service = connection.dht();
451 /// let value = dht_service.get_record("topic", "key").await.unwrap();
452 ///
453 /// if let Some(value) = value.and_then(|value| String::from_utf8(value).ok()) {
454 /// println!("Record has value: {value}");
455 /// } else {
456 /// println!("Record not found");
457 /// }
458 /// # }
459 /// ```
460 #[must_use]
461 pub fn dht(&self) -> DhtService {
462 DhtService::new(self)
463 }
464
465 /// Returns a handle to the discovery service.
466 ///
467 /// # Example
468 ///
469 /// ```no_run
470 /// use hyveos_sdk::Connection;
471 ///
472 /// # #[tokio::main]
473 /// # async fn main() {
474 /// let connection = Connection::new().await.unwrap();
475 /// let mut discovery_service = connection.discovery();
476 /// let peer_id = discovery_service.get_own_id().await.unwrap();
477 ///
478 /// println!("My peer id: {peer_id}");
479 /// # }
480 /// ```
481 #[must_use]
482 pub fn discovery(&self) -> DiscoveryService {
483 DiscoveryService::new(self)
484 }
485
486 /// Returns a handle to the file transfer service.
487 ///
488 /// # Example
489 ///
490 /// ```no_run
491 /// use std::path::Path;
492 ///
493 /// use hyveos_sdk::Connection;
494 ///
495 /// # #[tokio::main]
496 /// # async fn main() {
497 /// let shared_dir = std::env::var(hyveos_core::BRIDGE_SHARED_DIR_ENV_VAR).unwrap();
498 /// let file_path = Path::new(&shared_dir).join("example.txt");
499 /// tokio::fs::write(&file_path, "Hello, world!").await.unwrap();
500 ///
501 /// let connection = Connection::new().await.unwrap();
502 /// let mut file_transfer_service = connection.file_transfer();
503 /// let cid = file_transfer_service.publish_file(&file_path).await.unwrap();
504 ///
505 /// println!("Content ID: {cid:?}");
506 /// # }
507 /// ```
508 #[must_use]
509 pub fn file_transfer(&self) -> FileTransferService {
510 FileTransferService::new(self)
511 }
512
513 /// Returns a handle to the gossipsub service.
514 ///
515 /// # Example
516 ///
517 /// ```no_run
518 /// use hyveos_sdk::Connection;
519 ///
520 /// # #[tokio::main]
521 /// # async fn main() {
522 /// let connection = Connection::new().await.unwrap();
523 /// let mut gossipsub_service = connection.gossipsub();
524 /// let id = gossipsub_service.publish("topic", "Hello, world!").await.unwrap();
525 ///
526 /// println!("Published message with id: {id}");
527 /// # }
528 /// ```
529 #[must_use]
530 pub fn gossipsub(&self) -> GossipSubService {
531 GossipSubService::new(self)
532 }
533
534 /// Returns a handle to the request-response service.
535 ///
536 /// # Example
537 ///
538 /// ```no_run
539 /// use futures::StreamExt as _;
540 /// use hyveos_sdk::Connection;
541 ///
542 /// # #[tokio::main]
543 /// # async fn main() {
544 /// let connection = Connection::new().await.unwrap();
545 /// let mut dht_service = connection.dht();
546 /// let peer_id = dht_service
547 /// .get_providers("identification", "example")
548 /// .await
549 /// .unwrap()
550 /// .next()
551 /// .await
552 /// .unwrap()
553 /// .unwrap();
554 ///
555 /// let mut req_resp_service = connection.req_resp();
556 /// let response = req_resp_service
557 /// .send_request(peer_id, "Hello, world!", None)
558 /// .await
559 /// .unwrap();
560 ///
561 /// let data = Vec::try_from(response).unwrap();
562 /// println!("Received response: {}", String::from_utf8(data).unwrap());
563 /// # }
564 /// ```
565 #[must_use]
566 pub fn req_resp(&self) -> ReqRespService {
567 ReqRespService::new(self)
568 }
569
570 /// Returns a handle to the request-response service with JSON-encoded requests and responses.
571 ///
572 /// # Example
573 ///
574 /// ```no_run
575 /// use futures::StreamExt as _;
576 /// use hyveos_sdk::Connection;
577 /// use serde::{Serialize, Deserialize};
578 ///
579 /// #[derive(Debug, Serialize, Deserialize)]
580 /// struct ExampleRequest {
581 /// message: String,
582 /// }
583 ///
584 /// #[derive(Debug, Serialize, Deserialize)]
585 /// struct ExampleResponse {
586 /// message: String,
587 /// }
588 ///
589 /// # #[tokio::main]
590 /// # async fn main() {
591 /// let connection = Connection::new().await.unwrap();
592 /// let mut dht_service = connection.dht();
593 /// let peer_id = dht_service
594 /// .get_providers("identification", "example")
595 /// .await
596 /// .unwrap()
597 /// .next()
598 /// .await
599 /// .unwrap()
600 /// .unwrap();
601 ///
602 /// let mut req_resp_service = connection.req_resp_json();
603 /// let request = ExampleRequest { message: "Hello, world!".to_string() };
604 /// let response = req_resp_service
605 /// .send_request(peer_id, &request, None)
606 /// .await
607 /// .unwrap();
608 ///
609 /// let data: ExampleResponse = Result::from(response).unwrap();
610 /// println!("Received response: {data:?}");
611 /// # }
612 /// ```
613 #[cfg(feature = "json")]
614 #[must_use]
615 pub fn req_resp_json<Req, Resp>(&self) -> JsonReqRespService<Req, Resp>
616 where
617 Req: Serialize + DeserializeOwned,
618 Resp: Serialize + DeserializeOwned,
619 {
620 JsonReqRespService::new(self)
621 }
622
623 /// Returns a handle to the request-response service with JSON-encoded requests and responses.
624 ///
625 /// # Example
626 ///
627 /// ```no_run
628 /// use futures::StreamExt as _;
629 /// use hyveos_sdk::Connection;
630 /// use serde::{Serialize, Deserialize};
631 ///
632 /// #[derive(Debug, Serialize, Deserialize)]
633 /// struct ExampleRequest {
634 /// message: String,
635 /// }
636 ///
637 /// #[derive(Debug, Serialize, Deserialize)]
638 /// struct ExampleResponse {
639 /// message: String,
640 /// }
641 ///
642 /// # #[tokio::main]
643 /// # async fn main() {
644 /// let connection = Connection::new().await.unwrap();
645 /// let mut dht_service = connection.dht();
646 /// let peer_id = dht_service
647 /// .get_providers("identification", "example")
648 /// .await
649 /// .unwrap()
650 /// .next()
651 /// .await
652 /// .unwrap()
653 /// .unwrap();
654 ///
655 /// let mut req_resp_service = connection.req_resp_cbor();
656 /// let request = ExampleRequest { message: "Hello, world!".to_string() };
657 /// let response = req_resp_service
658 /// .send_request(peer_id, &request, None)
659 /// .await
660 /// .unwrap();
661 ///
662 /// let data: ExampleResponse = Result::from(response).unwrap();
663 /// println!("Received response: {data:?}");
664 /// # }
665 /// ```
666 #[cfg(feature = "cbor")]
667 #[must_use]
668 pub fn req_resp_cbor<Req, Resp>(&self) -> CborReqRespService<Req, Resp>
669 where
670 Req: Serialize + DeserializeOwned,
671 Resp: Serialize + DeserializeOwned,
672 {
673 CborReqRespService::new(self)
674 }
675
676 /// Returns a handle to the scripting service.
677 ///
678 /// # Example
679 ///
680 /// ```no_run
681 /// use hyveos_sdk::{Connection, services::ScriptingConfig};
682 ///
683 /// # #[tokio::main]
684 /// # async fn main() {
685 /// let connection = Connection::new().await.unwrap();
686 /// let mut scripting_service = connection.scripting();
687 ///
688 /// let config = ScriptingConfig::new("my-docker-image:latest")
689 /// .local()
690 /// .expose_port(8080);
691 /// let script_id = scripting_service.deploy_script(config).await.unwrap();
692 ///
693 /// println!("Deployed script with id on self: {script_id}");
694 /// # }
695 /// ```
696 #[doc(hidden)]
697 #[cfg(feature = "scripting")]
698 #[must_use]
699 pub fn scripting(&self) -> ScriptingService {
700 ScriptingService::new(self)
701 }
702}