hyveos_sdk/
connection.rs

1use std::{env, path::PathBuf, sync::Arc};
2
3use hyper_util::rt::TokioIo;
4use hyveos_core::BRIDGE_SOCKET_ENV_VAR;
5#[cfg(feature = "serde")]
6use serde::{de::DeserializeOwned, Serialize};
7use tokio::net::UnixStream;
8use tonic::transport::{Channel, Endpoint, Uri};
9use tower::service_fn;
10
11#[cfg(feature = "cbor")]
12use crate::services::CborReqRespService;
13#[cfg(feature = "json")]
14use crate::services::JsonReqRespService;
15#[cfg(feature = "scripting")]
16use crate::services::ScriptingService;
17use crate::{
18    error::{Error, Result},
19    services::{
20        DbService, DebugService, DhtService, DiscoveryService, FileTransferService,
21        GossipSubService, ReqRespService,
22    },
23};
24
25mod internal {
26    use std::future::Future;
27
28    use super::Connection;
29    use crate::error::Result;
30
31    pub trait ConnectionType {
32        // We can promise `Send` here, so let's do it.
33        fn connect(self) -> impl Future<Output = Result<Connection>> + Send;
34    }
35}
36
37pub trait ConnectionType: internal::ConnectionType {}
38
39impl<T: internal::ConnectionType> ConnectionType for T {}
40
41/// A connection to the HyveOS runtime through the scripting bridge.
42///
43/// The Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable
44/// ([`hyveos_core::BRIDGE_SOCKET_ENV_VAR`]) will be used to communicate with the runtime.
45///
46/// This is the standard connection type when used in a HyveOS script.
47#[derive(Debug, Clone)]
48pub struct BridgeConnection {
49    _private: (),
50}
51
52impl internal::ConnectionType for BridgeConnection {
53    async fn connect(self) -> Result<Connection> {
54        let channel = Endpoint::try_from("http://[::]:50051")?
55            .connect_with_connector(service_fn(|_: Uri| async {
56                let path = env::var(BRIDGE_SOCKET_ENV_VAR)
57                    .map_err(|e| Error::EnvVarMissing(BRIDGE_SOCKET_ENV_VAR, e))?;
58
59                UnixStream::connect(path)
60                    .await
61                    .map_err(Error::from)
62                    .map(TokioIo::new)
63            }))
64            .await?;
65
66        Ok(Connection {
67            channel,
68            #[cfg(feature = "network")]
69            reqwest_client_and_url: None,
70            shared_dir_path: None,
71        })
72    }
73}
74
75/// A connection to the HyveOS runtime through a custom Unix domain socket.
76#[derive(Debug, Clone)]
77pub struct CustomConnection {
78    socket_path: PathBuf,
79    shared_dir_path: PathBuf,
80}
81
82impl internal::ConnectionType for CustomConnection {
83    async fn connect(self) -> Result<Connection> {
84        let socket_path = Arc::new(self.socket_path);
85        let channel = Endpoint::try_from("http://[::]:50051")?
86            .connect_with_connector(service_fn(move |_: Uri| {
87                let socket_path = socket_path.clone();
88                async move {
89                    UnixStream::connect(socket_path.as_path())
90                        .await
91                        .map_err(Error::from)
92                        .map(TokioIo::new)
93                }
94            }))
95            .await?;
96
97        Ok(Connection {
98            channel,
99            #[cfg(feature = "network")]
100            reqwest_client_and_url: None,
101            shared_dir_path: Some(Arc::new(self.shared_dir_path)),
102        })
103    }
104}
105
106/// A connection over the network to a HyveOS runtime listening at a given URI.
107#[cfg(feature = "network")]
108#[derive(Debug, Clone)]
109pub struct UriConnection {
110    uri: Uri,
111}
112
113#[cfg(feature = "network")]
114impl internal::ConnectionType for UriConnection {
115    async fn connect(self) -> Result<Connection> {
116        let (url, if_name) = uri_to_url_and_if_name(self.uri.clone())?;
117        let channel = Endpoint::from(self.uri).connect().await?;
118
119        #[cfg_attr(
120            not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")),
121            expect(unused_mut)
122        )]
123        let mut client_builder = reqwest::Client::builder();
124
125        #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
126        if let Some(if_name) = if_name {
127            client_builder = client_builder.interface(&if_name);
128        }
129        #[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))]
130        assert!(
131            if_name.is_none(),
132            "Interface name in URI is only supported on Android, Fuchsia, and Linux"
133        );
134
135        let client = client_builder.build()?;
136
137        Ok(Connection {
138            channel,
139            reqwest_client_and_url: Some((client, url)),
140            shared_dir_path: None,
141        })
142    }
143}
144
145#[cfg(feature = "network")]
146fn uri_to_url_and_if_name(uri: Uri) -> Result<(reqwest::Url, Option<String>)> {
147    let mut parts = uri.into_parts();
148    let mut if_name = None;
149    if let Some(authority) = &parts.authority {
150        let authority = authority.as_str();
151
152        if let Some(ipv6_start) = authority.find('[') {
153            if let Some(start) = authority[ipv6_start..].find('%') {
154                if let Some(end) = authority[start..].find(']') {
155                    let zone = &authority[start + 1..end];
156                    let name = zone
157                        .parse()
158                        .ok()
159                        .and_then(|index| hyveos_ifaddr::if_index_to_name(index).ok())
160                        .unwrap_or_else(|| zone.to_string());
161                    if_name = Some(name);
162                    let mut authority = authority.to_string();
163                    authority.replace_range(start..end, "");
164                    parts.authority = Some(authority.parse()?);
165                }
166            }
167        }
168    }
169
170    Ok((Uri::try_from(parts)?.to_string().parse()?, if_name))
171}
172
173/// A builder for configuring a connection to the HyveOS runtime.
174#[derive(Debug, Clone)]
175pub struct ConnectionBuilder<T> {
176    connection_type: T,
177}
178
179impl Default for ConnectionBuilder<BridgeConnection> {
180    fn default() -> Self {
181        Self::new()
182    }
183}
184
185impl ConnectionBuilder<BridgeConnection> {
186    /// Creates a new builder for configuring a connection to the HyveOS runtime.
187    ///
188    /// By default, the connection to the HyveOS runtime will be made through the scripting bridge,
189    /// i.e., the Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable
190    /// ([`hyveos_core::BRIDGE_SOCKET_ENV_VAR`]) will be used to communicate with the runtime.
191    /// If another connection type is desired, use the [`Self::custom`] or [`Self::uri`] methods.
192    #[must_use]
193    pub fn new() -> Self {
194        Self {
195            connection_type: BridgeConnection { _private: () },
196        }
197    }
198
199    /// Specifies a custom Unix domain socket to connect to.
200    ///
201    /// The socket path should point to a Unix domain socket that the HyveOS runtime is listening on.
202    /// The shared directory path should point to the shared directory that the HyveOS runtime is using.
203    ///
204    /// # Example
205    ///
206    /// ```no_run
207    /// use hyveos_sdk::Connection;
208    ///
209    /// # #[tokio::main]
210    /// # async fn main() {
211    /// let connection = Connection::builder()
212    ///     .custom("/path/to/hyveos.sock", "/path/to/shared/dir")
213    ///     .connect()
214    ///     .await
215    ///     .unwrap();
216    /// let mut discovery_service = connection.discovery();
217    /// let peer_id = discovery_service.get_own_id().await.unwrap();
218    ///
219    /// println!("My peer id: {peer_id}");
220    /// # }
221    /// ```
222    pub fn custom(
223        self,
224        socket_path: impl Into<PathBuf>,
225        shared_dir_path: impl Into<PathBuf>,
226    ) -> ConnectionBuilder<CustomConnection> {
227        ConnectionBuilder {
228            connection_type: CustomConnection {
229                socket_path: socket_path.into(),
230                shared_dir_path: shared_dir_path.into(),
231            },
232        }
233    }
234
235    /// Specifies a URI to connect to over the network.
236    ///
237    /// The URI should be in the format `http://<host>:<port>`.
238    /// A HyveOS runtime should be listening at the given address.
239    ///
240    /// > **Note**: If the provided URI's path is not just `/` (e.g. `http://example.com:12345/foo/bar/`),
241    /// > make sure that it ends with a slash!
242    ///
243    /// # Example
244    ///
245    /// ```no_run
246    /// use hyveos_sdk::{Connection, Uri};
247    ///
248    /// # #[tokio::main]
249    /// # async fn main() {
250    /// let uri = Uri::from_static("http://[::1]:50051");
251    /// let connection = Connection::builder()
252    ///     .uri(uri)
253    ///     .connect()
254    ///     .await
255    ///     .unwrap();
256    /// let mut discovery_service = connection.discovery();
257    /// let peer_id = discovery_service.get_own_id().await.unwrap();
258    ///
259    /// println!("My peer id: {peer_id}");
260    /// # }
261    /// ```
262    #[cfg(feature = "network")]
263    pub fn uri(self, uri: Uri) -> ConnectionBuilder<UriConnection> {
264        ConnectionBuilder {
265            connection_type: UriConnection { uri },
266        }
267    }
268}
269
270impl<T: ConnectionType> ConnectionBuilder<T> {
271    /// Establishes a connection to the HyveOS runtime.
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if the connection could not be established.
276    ///
277    /// # Example
278    ///
279    /// ```no_run
280    /// use hyveos_sdk::Connection;
281    ///
282    /// # #[tokio::main]
283    /// # async fn main() {
284    /// let connection = Connection::builder()
285    ///     .custom("/path/to/hyveos.sock", "/path/to/shared/dir")
286    ///     .connect()
287    ///     .await
288    ///     .unwrap();
289    /// let mut discovery_service = connection.discovery();
290    /// let peer_id = discovery_service.get_own_id().await.unwrap();
291    ///
292    /// println!("My peer id: {peer_id}");
293    /// # }
294    /// ```
295    pub async fn connect(self) -> Result<Connection> {
296        self.connection_type.connect().await
297    }
298}
299
300/// A connection to the HyveOS runtime.
301///
302/// This struct provides access to the various services provided by HyveOS.
303///
304/// By default, the connection to the HyveOS runtime will be made through the scripting bridge,
305/// i.e., the Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable
306/// ([`hyveos_core::BRIDGE_SOCKET_ENV_VAR`]) will be used to communicate with the runtime.
307/// If another connection type is desired, use the [`Self::builder`] function to get a
308/// [`ConnectionBuilder`] and use the [`ConnectionBuilder::custom`] or
309/// [`ConnectionBuilder::uri`] methods.
310///
311/// # Example
312///
313/// ```no_run
314/// use hyveos_sdk::Connection;
315///
316/// # #[tokio::main]
317/// # async fn main() {
318/// let connection = Connection::new().await.unwrap();
319/// let mut discovery_service = connection.discovery();
320/// let peer_id = discovery_service.get_own_id().await.unwrap();
321///
322/// println!("My peer id: {peer_id}");
323/// # }
324/// ```
325pub struct Connection {
326    pub(crate) channel: Channel,
327    #[cfg(feature = "network")]
328    pub(crate) reqwest_client_and_url: Option<(reqwest::Client, reqwest::Url)>,
329    pub(crate) shared_dir_path: Option<Arc<PathBuf>>,
330}
331
332impl Connection {
333    /// Establishes a connection to the HyveOS runtime through the scripting bridge.
334    ///
335    /// The Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable
336    /// ([`hyveos_core::BRIDGE_SOCKET_ENV_VAR`]) will be used to communicate with the runtime.
337    /// If another connection type is desired, use the [`Self::builder`] function to get a
338    /// [`ConnectionBuilder`] and use the [`ConnectionBuilder::custom`] or
339    /// [`ConnectionBuilder::uri`] methods.
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if the connection could not be established.
344    ///
345    /// # Example
346    ///
347    /// ```no_run
348    /// use hyveos_sdk::Connection;
349    ///
350    /// # #[tokio::main]
351    /// # async fn main() {
352    /// let connection = Connection::new().await.unwrap();
353    /// let mut discovery_service = connection.discovery();
354    /// let peer_id = discovery_service.get_own_id().await.unwrap();
355    ///
356    /// println!("My peer id: {peer_id}");
357    /// # }
358    /// ```
359    pub async fn new() -> Result<Self> {
360        Connection::builder().connect().await
361    }
362
363    /// Creates a new builder for configuring a connection to the HyveOS runtime.
364    ///
365    /// By default, the connection to the HyveOS runtime will be made through the scripting bridge,
366    /// i.e., the Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable
367    /// ([`hyveos_core::BRIDGE_SOCKET_ENV_VAR`]) will be used to communicate with the runtime.
368    /// If another connection type is desired, use the [`ConnectionBuilder::custom`] or
369    /// [`ConnectionBuilder::uri`] methods.
370    ///
371    /// # Example
372    ///
373    /// ```no_run
374    /// use hyveos_sdk::Connection;
375    ///
376    /// # #[tokio::main]
377    /// # async fn main() {
378    /// let connection = Connection::builder()
379    ///     .custom("/path/to/hyveos.sock", "/path/to/shared/dir")
380    ///     .connect()
381    ///     .await
382    ///     .unwrap();
383    /// let mut discovery_service = connection.discovery();
384    /// let peer_id = discovery_service.get_own_id().await.unwrap();
385    ///
386    /// println!("My peer id: {peer_id}");
387    /// # }
388    /// ```
389    #[must_use]
390    pub fn builder() -> ConnectionBuilder<BridgeConnection> {
391        ConnectionBuilder::new()
392    }
393
394    /// Returns a handle to the database service.
395    ///
396    /// # Example
397    ///
398    /// ```no_run
399    /// use hyveos_sdk::Connection;
400    ///
401    /// # #[tokio::main]
402    /// # async fn main() {
403    /// let connection = Connection::new().await.unwrap();
404    /// let mut db_service = connection.db();
405    /// assert!(db_service.put("key", b"value").await.unwrap().is_none());
406    ///
407    /// let value = db_service.get("key").await.unwrap().unwrap();
408    /// assert_eq!(value, b"value");
409    /// # }
410    /// ```
411    #[must_use]
412    pub fn db(&self) -> DbService {
413        DbService::new(self)
414    }
415
416    /// Returns a handle to the debug service.
417    ///
418    /// # Example
419    ///
420    /// ```no_run
421    /// use futures::TryStreamExt as _;
422    /// use hyveos_sdk::Connection;
423    ///
424    /// # #[tokio::main]
425    /// # async fn main() {
426    /// let connection = Connection::new().await.unwrap();
427    /// let mut debug_service = connection.debug();
428    /// let mut events = debug_service.subscribe_mesh_topology().await.unwrap();
429    ///
430    /// while let Some(event) = events.try_next().await.unwrap() {
431    ///     println!("{event:?}");
432    /// }
433    /// # }
434    /// ```
435    #[must_use]
436    pub fn debug(&self) -> DebugService {
437        DebugService::new(self)
438    }
439
440    /// Returns a handle to the DHT service.
441    ///
442    /// # Example
443    ///
444    /// ```no_run
445    /// use hyveos_sdk::Connection;
446    ///
447    /// # #[tokio::main]
448    /// # async fn main() {
449    /// let connection = Connection::new().await.unwrap();
450    /// let mut dht_service = connection.dht();
451    /// let value = dht_service.get_record("topic", "key").await.unwrap();
452    ///
453    /// if let Some(value) = value.and_then(|value| String::from_utf8(value).ok()) {
454    ///     println!("Record has value: {value}");
455    /// } else {
456    ///    println!("Record not found");
457    /// }
458    /// # }
459    /// ```
460    #[must_use]
461    pub fn dht(&self) -> DhtService {
462        DhtService::new(self)
463    }
464
465    /// Returns a handle to the discovery service.
466    ///
467    /// # Example
468    ///
469    /// ```no_run
470    /// use hyveos_sdk::Connection;
471    ///
472    /// # #[tokio::main]
473    /// # async fn main() {
474    /// let connection = Connection::new().await.unwrap();
475    /// let mut discovery_service = connection.discovery();
476    /// let peer_id = discovery_service.get_own_id().await.unwrap();
477    ///
478    /// println!("My peer id: {peer_id}");
479    /// # }
480    /// ```
481    #[must_use]
482    pub fn discovery(&self) -> DiscoveryService {
483        DiscoveryService::new(self)
484    }
485
486    /// Returns a handle to the file transfer service.
487    ///
488    /// # Example
489    ///
490    /// ```no_run
491    /// use std::path::Path;
492    ///
493    /// use hyveos_sdk::Connection;
494    ///
495    /// # #[tokio::main]
496    /// # async fn main() {
497    /// let shared_dir = std::env::var(hyveos_core::BRIDGE_SHARED_DIR_ENV_VAR).unwrap();
498    /// let file_path = Path::new(&shared_dir).join("example.txt");
499    /// tokio::fs::write(&file_path, "Hello, world!").await.unwrap();
500    ///
501    /// let connection = Connection::new().await.unwrap();
502    /// let mut file_transfer_service = connection.file_transfer();
503    /// let cid = file_transfer_service.publish_file(&file_path).await.unwrap();
504    ///
505    /// println!("Content ID: {cid:?}");
506    /// # }
507    /// ```
508    #[must_use]
509    pub fn file_transfer(&self) -> FileTransferService {
510        FileTransferService::new(self)
511    }
512
513    /// Returns a handle to the gossipsub service.
514    ///
515    /// # Example
516    ///
517    /// ```no_run
518    /// use hyveos_sdk::Connection;
519    ///
520    /// # #[tokio::main]
521    /// # async fn main() {
522    /// let connection = Connection::new().await.unwrap();
523    /// let mut gossipsub_service = connection.gossipsub();
524    /// let id = gossipsub_service.publish("topic", "Hello, world!").await.unwrap();
525    ///
526    /// println!("Published message with id: {id}");
527    /// # }
528    /// ```
529    #[must_use]
530    pub fn gossipsub(&self) -> GossipSubService {
531        GossipSubService::new(self)
532    }
533
534    /// Returns a handle to the request-response service.
535    ///
536    /// # Example
537    ///
538    /// ```no_run
539    /// use futures::StreamExt as _;
540    /// use hyveos_sdk::Connection;
541    ///
542    /// # #[tokio::main]
543    /// # async fn main() {
544    /// let connection = Connection::new().await.unwrap();
545    /// let mut dht_service = connection.dht();
546    /// let peer_id = dht_service
547    ///     .get_providers("identification", "example")
548    ///     .await
549    ///     .unwrap()
550    ///     .next()
551    ///     .await
552    ///     .unwrap()
553    ///     .unwrap();
554    ///
555    /// let mut req_resp_service = connection.req_resp();
556    /// let response = req_resp_service
557    ///     .send_request(peer_id, "Hello, world!", None)
558    ///     .await
559    ///     .unwrap();
560    ///
561    /// let data = Vec::try_from(response).unwrap();
562    /// println!("Received response: {}", String::from_utf8(data).unwrap());
563    /// # }
564    /// ```
565    #[must_use]
566    pub fn req_resp(&self) -> ReqRespService {
567        ReqRespService::new(self)
568    }
569
570    /// Returns a handle to the request-response service with JSON-encoded requests and responses.
571    ///
572    /// # Example
573    ///
574    /// ```no_run
575    /// use futures::StreamExt as _;
576    /// use hyveos_sdk::Connection;
577    /// use serde::{Serialize, Deserialize};
578    ///
579    /// #[derive(Debug, Serialize, Deserialize)]
580    /// struct ExampleRequest {
581    ///    message: String,
582    /// }
583    ///
584    /// #[derive(Debug, Serialize, Deserialize)]
585    /// struct ExampleResponse {
586    ///    message: String,
587    /// }
588    ///
589    /// # #[tokio::main]
590    /// # async fn main() {
591    /// let connection = Connection::new().await.unwrap();
592    /// let mut dht_service = connection.dht();
593    /// let peer_id = dht_service
594    ///     .get_providers("identification", "example")
595    ///     .await
596    ///     .unwrap()
597    ///     .next()
598    ///     .await
599    ///     .unwrap()
600    ///     .unwrap();
601    ///
602    /// let mut req_resp_service = connection.req_resp_json();
603    /// let request = ExampleRequest { message: "Hello, world!".to_string() };
604    /// let response = req_resp_service
605    ///     .send_request(peer_id, &request, None)
606    ///     .await
607    ///     .unwrap();
608    ///
609    /// let data: ExampleResponse = Result::from(response).unwrap();
610    /// println!("Received response: {data:?}");
611    /// # }
612    /// ```
613    #[cfg(feature = "json")]
614    #[must_use]
615    pub fn req_resp_json<Req, Resp>(&self) -> JsonReqRespService<Req, Resp>
616    where
617        Req: Serialize + DeserializeOwned,
618        Resp: Serialize + DeserializeOwned,
619    {
620        JsonReqRespService::new(self)
621    }
622
623    /// Returns a handle to the request-response service with JSON-encoded requests and responses.
624    ///
625    /// # Example
626    ///
627    /// ```no_run
628    /// use futures::StreamExt as _;
629    /// use hyveos_sdk::Connection;
630    /// use serde::{Serialize, Deserialize};
631    ///
632    /// #[derive(Debug, Serialize, Deserialize)]
633    /// struct ExampleRequest {
634    ///    message: String,
635    /// }
636    ///
637    /// #[derive(Debug, Serialize, Deserialize)]
638    /// struct ExampleResponse {
639    ///    message: String,
640    /// }
641    ///
642    /// # #[tokio::main]
643    /// # async fn main() {
644    /// let connection = Connection::new().await.unwrap();
645    /// let mut dht_service = connection.dht();
646    /// let peer_id = dht_service
647    ///     .get_providers("identification", "example")
648    ///     .await
649    ///     .unwrap()
650    ///     .next()
651    ///     .await
652    ///     .unwrap()
653    ///     .unwrap();
654    ///
655    /// let mut req_resp_service = connection.req_resp_cbor();
656    /// let request = ExampleRequest { message: "Hello, world!".to_string() };
657    /// let response = req_resp_service
658    ///     .send_request(peer_id, &request, None)
659    ///     .await
660    ///     .unwrap();
661    ///
662    /// let data: ExampleResponse = Result::from(response).unwrap();
663    /// println!("Received response: {data:?}");
664    /// # }
665    /// ```
666    #[cfg(feature = "cbor")]
667    #[must_use]
668    pub fn req_resp_cbor<Req, Resp>(&self) -> CborReqRespService<Req, Resp>
669    where
670        Req: Serialize + DeserializeOwned,
671        Resp: Serialize + DeserializeOwned,
672    {
673        CborReqRespService::new(self)
674    }
675
676    /// Returns a handle to the scripting service.
677    ///
678    /// # Example
679    ///
680    /// ```no_run
681    /// use hyveos_sdk::{Connection, services::ScriptingConfig};
682    ///
683    /// # #[tokio::main]
684    /// # async fn main() {
685    /// let connection = Connection::new().await.unwrap();
686    /// let mut scripting_service = connection.scripting();
687    ///
688    /// let config = ScriptingConfig::new("my-docker-image:latest")
689    ///     .local()
690    ///     .expose_port(8080);
691    /// let script_id = scripting_service.deploy_script(config).await.unwrap();
692    ///
693    /// println!("Deployed script with id on self: {script_id}");
694    /// # }
695    /// ```
696    #[doc(hidden)]
697    #[cfg(feature = "scripting")]
698    #[must_use]
699    pub fn scripting(&self) -> ScriptingService {
700        ScriptingService::new(self)
701    }
702}