indradb_proto/
client.rs

1use std::convert::TryInto;
2use std::error::Error as StdError;
3use std::fmt;
4use std::sync::{Arc, Mutex};
5
6use crate::ConversionError;
7
8use tokio::sync::mpsc;
9use tokio_stream::wrappers::ReceiverStream;
10use tokio_stream::StreamExt;
11use tonic::transport::{Channel, Endpoint, Error as TonicTransportError};
12use tonic::{Request, Status};
13use uuid::Uuid;
14
15const CHANNEL_CAPACITY: usize = 100;
16
17/// The error returned if a client operation failed.
18#[derive(Debug)]
19pub enum ClientError {
20    /// Conversion between an IndraDB and its protobuf equivalent failed.
21    Conversion { inner: ConversionError },
22    /// A gRPC error.
23    Grpc { inner: Status },
24    /// A transport error.
25    Transport { inner: TonicTransportError },
26    /// The gRPC channel has been closed.
27    ChannelClosed,
28}
29
30impl StdError for ClientError {
31    fn source(&self) -> Option<&(dyn StdError + 'static)> {
32        match *self {
33            ClientError::Conversion { ref inner } => Some(inner),
34            ClientError::Grpc { ref inner } => Some(inner),
35            ClientError::Transport { ref inner } => Some(inner),
36            _ => None,
37        }
38    }
39}
40
41impl fmt::Display for ClientError {
42    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43        match *self {
44            ClientError::Conversion { ref inner } => inner.fmt(f),
45            ClientError::Grpc { ref inner } => write!(f, "grpc error: {inner}"),
46            ClientError::Transport { ref inner } => write!(f, "transport error: {inner}"),
47            ClientError::ChannelClosed => write!(f, "failed to send request: channel closed"),
48        }
49    }
50}
51
52impl From<ConversionError> for ClientError {
53    fn from(err: ConversionError) -> Self {
54        ClientError::Conversion { inner: err }
55    }
56}
57
58impl From<Status> for ClientError {
59    fn from(err: Status) -> Self {
60        ClientError::Grpc { inner: err }
61    }
62}
63
64impl From<TonicTransportError> for ClientError {
65    fn from(err: TonicTransportError) -> Self {
66        ClientError::Transport { inner: err }
67    }
68}
69
70impl<T> From<mpsc::error::SendError<T>> for ClientError {
71    fn from(_: mpsc::error::SendError<T>) -> Self {
72        ClientError::ChannelClosed
73    }
74}
75
76/// A higher-level client implementation.
77///
78/// This should be better suited than the low-level client auto-generated by
79/// gRPC/tonic in virtually every case, unless you want to avoid the cost of
80/// translating between protobuf types and their IndraDB equivalents. The
81/// interface is designed to resemble `indradb::Database`, but async.
82#[derive(Clone)]
83pub struct Client(crate::ProtoClient<Channel>);
84
85impl Client {
86    /// Creates a new client.
87    ///
88    /// # Arguments
89    /// * `endpoint`: The server endpoint.
90    pub async fn new(endpoint: Endpoint) -> Result<Self, ClientError> {
91        let client = crate::ProtoClient::connect(endpoint).await?;
92        Ok(Client(client))
93    }
94
95    /// Pings the server.
96    pub async fn ping(&mut self) -> Result<(), ClientError> {
97        self.0.ping(()).await?;
98        Ok(())
99    }
100
101    /// Syncs persisted content. Depending on the datastore implementation,
102    /// this has different meanings - including potentially being a no-op.
103    pub async fn sync(&mut self) -> Result<(), ClientError> {
104        self.0.sync(()).await?;
105        Ok(())
106    }
107
108    /// Creates a new vertex. Returns whether the vertex was successfully
109    /// created - if this is false, it's because a vertex with the same UUID
110    /// already exists.
111    ///
112    /// # Arguments
113    /// * `vertex`: The vertex to create.
114    pub async fn create_vertex(&mut self, vertex: &indradb::Vertex) -> Result<bool, ClientError> {
115        let vertex: crate::Vertex = vertex.clone().into();
116        let res = self.0.create_vertex(vertex).await?;
117        Ok(res.into_inner().created)
118    }
119
120    /// Creates a new vertex with just a type specification. As opposed to
121    /// `create_vertex`, this is used when you do not want to manually specify
122    /// the vertex's UUID. Returns the new vertex's UUID.
123    ///
124    /// # Arguments
125    /// * `t`: The type of the vertex to create.
126    pub async fn create_vertex_from_type(&mut self, t: indradb::Identifier) -> Result<Uuid, ClientError> {
127        let t: crate::Identifier = t.into();
128        let res = self.0.create_vertex_from_type(t).await?;
129        Ok(res.into_inner().try_into()?)
130    }
131
132    /// Creates a new edge. If the edge already exists, this will update it
133    /// with a new update datetime. Returns whether the edge was successfully
134    /// created - if this is false, it's because one of the specified vertices
135    /// is missing.
136    ///
137    /// # Arguments
138    /// * `edge`: The edge to create.
139    pub async fn create_edge(&mut self, edge: &indradb::Edge) -> Result<bool, ClientError> {
140        let edge: crate::Edge = edge.clone().into();
141        let res = self.0.create_edge(edge).await?;
142        Ok(res.into_inner().created)
143    }
144
145    /// Gets values specified by a query.
146    ///
147    /// # Arguments
148    /// * `q`: The query to run.
149    pub async fn get<Q: Into<indradb::Query>>(&mut self, q: Q) -> Result<Vec<indradb::QueryOutputValue>, ClientError> {
150        let q: crate::Query = q.into().into();
151        let mut output = Vec::<indradb::QueryOutputValue>::new();
152        let mut res = self.0.get(q).await?.into_inner();
153        while let Some(res) = res.next().await {
154            output.push(res?.try_into()?);
155        }
156        Ok(output)
157    }
158
159    /// Deletes values specified by a query.
160    ///
161    /// # Arguments
162    /// * `q`: The query to run.
163    pub async fn delete<Q: Into<indradb::Query>>(&mut self, q: Q) -> Result<(), ClientError> {
164        let q: crate::Query = q.into().into();
165        self.0.delete(q).await?;
166        Ok(())
167    }
168
169    /// Sets properties.
170    ///
171    /// # Arguments
172    /// * `q`: The query to run.
173    /// * `name`: The property name.
174    /// * `value`: The property value.
175    pub async fn set_properties<Q: Into<indradb::Query>>(
176        &mut self,
177        q: Q,
178        name: indradb::Identifier,
179        value: &indradb::Json,
180    ) -> Result<(), ClientError> {
181        let name: crate::Identifier = name.into();
182        let value: crate::Json = value.clone().into();
183        let req = Request::new(crate::SetPropertiesRequest {
184            q: Some(q.into().into()),
185            name: name.into(),
186            value: value.clone().into(),
187        });
188        self.0.set_properties(req).await?;
189        Ok(())
190    }
191
192    /// Bulk inserts many vertices, edges, and/or properties.
193    ///
194    /// Note that datastores have discretion on how to approach safeguard vs
195    /// performance tradeoffs. In particular:
196    /// * If the datastore is disk-backed, it may or may not flush before
197    ///   returning.
198    /// * The datastore might not verify for correctness; e.g., it might not
199    ///   ensure that the relevant vertices exist before inserting an edge.
200    ///   If you want maximum protection, use the equivalent functions in
201    ///   transactions, which will provide more safeguards.
202    ///
203    /// # Arguments
204    /// * `items`: The items to insert.
205    pub async fn bulk_insert(&mut self, items: Vec<indradb::BulkInsertItem>) -> Result<(), ClientError> {
206        let (tx, rx) = mpsc::channel(CHANNEL_CAPACITY);
207        let last_err: Arc<Mutex<Option<ClientError>>> = Arc::new(Mutex::new(None));
208
209        {
210            let last_err = last_err.clone();
211            tokio::spawn(async move {
212                for item in items.into_iter() {
213                    if let Err(err) = tx.send(item.into()).await {
214                        *last_err.lock().unwrap() = Some(err.into());
215                        return;
216                    }
217                }
218            });
219        }
220
221        self.0.bulk_insert(Request::new(ReceiverStream::new(rx))).await?;
222
223        let mut last_err = last_err.lock().unwrap();
224        if last_err.is_some() {
225            Err(last_err.take().unwrap())
226        } else {
227            Ok(())
228        }
229    }
230
231    pub async fn index_property(&mut self, name: indradb::Identifier) -> Result<(), ClientError> {
232        let request = Request::new(crate::IndexPropertyRequest {
233            name: Some(name.into()),
234        });
235        self.0.index_property(request).await?;
236        Ok(())
237    }
238
239    pub async fn execute_plugin(&mut self, name: &str, arg: indradb::Json) -> Result<indradb::Json, ClientError> {
240        let req = Request::new(crate::ExecutePluginRequest {
241            name: name.to_string(),
242            arg: Some(arg.into()),
243        });
244        let res = self.0.execute_plugin(req).await?;
245        match res.into_inner().value {
246            Some(value) => Ok(value.try_into()?),
247            None => Ok(indradb::Json::new(serde_json::Value::Null)),
248        }
249    }
250}