1use std::convert::TryInto;
2use std::error::Error as StdError;
3use std::fmt;
4use std::sync::{Arc, Mutex};
5
6use crate::ConversionError;
7
8use tokio::sync::mpsc;
9use tokio_stream::wrappers::ReceiverStream;
10use tokio_stream::StreamExt;
11use tonic::transport::{Channel, Endpoint, Error as TonicTransportError};
12use tonic::{Request, Status};
13use uuid::Uuid;
14
15const CHANNEL_CAPACITY: usize = 100;
16
17#[derive(Debug)]
19pub enum ClientError {
20 Conversion { inner: ConversionError },
22 Grpc { inner: Status },
24 Transport { inner: TonicTransportError },
26 ChannelClosed,
28}
29
30impl StdError for ClientError {
31 fn source(&self) -> Option<&(dyn StdError + 'static)> {
32 match *self {
33 ClientError::Conversion { ref inner } => Some(inner),
34 ClientError::Grpc { ref inner } => Some(inner),
35 ClientError::Transport { ref inner } => Some(inner),
36 _ => None,
37 }
38 }
39}
40
41impl fmt::Display for ClientError {
42 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43 match *self {
44 ClientError::Conversion { ref inner } => inner.fmt(f),
45 ClientError::Grpc { ref inner } => write!(f, "grpc error: {inner}"),
46 ClientError::Transport { ref inner } => write!(f, "transport error: {inner}"),
47 ClientError::ChannelClosed => write!(f, "failed to send request: channel closed"),
48 }
49 }
50}
51
52impl From<ConversionError> for ClientError {
53 fn from(err: ConversionError) -> Self {
54 ClientError::Conversion { inner: err }
55 }
56}
57
58impl From<Status> for ClientError {
59 fn from(err: Status) -> Self {
60 ClientError::Grpc { inner: err }
61 }
62}
63
64impl From<TonicTransportError> for ClientError {
65 fn from(err: TonicTransportError) -> Self {
66 ClientError::Transport { inner: err }
67 }
68}
69
70impl<T> From<mpsc::error::SendError<T>> for ClientError {
71 fn from(_: mpsc::error::SendError<T>) -> Self {
72 ClientError::ChannelClosed
73 }
74}
75
76#[derive(Clone)]
83pub struct Client(crate::ProtoClient<Channel>);
84
85impl Client {
86 pub async fn new(endpoint: Endpoint) -> Result<Self, ClientError> {
91 let client = crate::ProtoClient::connect(endpoint).await?;
92 Ok(Client(client))
93 }
94
95 pub async fn ping(&mut self) -> Result<(), ClientError> {
97 self.0.ping(()).await?;
98 Ok(())
99 }
100
101 pub async fn sync(&mut self) -> Result<(), ClientError> {
104 self.0.sync(()).await?;
105 Ok(())
106 }
107
108 pub async fn create_vertex(&mut self, vertex: &indradb::Vertex) -> Result<bool, ClientError> {
115 let vertex: crate::Vertex = vertex.clone().into();
116 let res = self.0.create_vertex(vertex).await?;
117 Ok(res.into_inner().created)
118 }
119
120 pub async fn create_vertex_from_type(&mut self, t: indradb::Identifier) -> Result<Uuid, ClientError> {
127 let t: crate::Identifier = t.into();
128 let res = self.0.create_vertex_from_type(t).await?;
129 Ok(res.into_inner().try_into()?)
130 }
131
132 pub async fn create_edge(&mut self, edge: &indradb::Edge) -> Result<bool, ClientError> {
140 let edge: crate::Edge = edge.clone().into();
141 let res = self.0.create_edge(edge).await?;
142 Ok(res.into_inner().created)
143 }
144
145 pub async fn get<Q: Into<indradb::Query>>(&mut self, q: Q) -> Result<Vec<indradb::QueryOutputValue>, ClientError> {
150 let q: crate::Query = q.into().into();
151 let mut output = Vec::<indradb::QueryOutputValue>::new();
152 let mut res = self.0.get(q).await?.into_inner();
153 while let Some(res) = res.next().await {
154 output.push(res?.try_into()?);
155 }
156 Ok(output)
157 }
158
159 pub async fn delete<Q: Into<indradb::Query>>(&mut self, q: Q) -> Result<(), ClientError> {
164 let q: crate::Query = q.into().into();
165 self.0.delete(q).await?;
166 Ok(())
167 }
168
169 pub async fn set_properties<Q: Into<indradb::Query>>(
176 &mut self,
177 q: Q,
178 name: indradb::Identifier,
179 value: &indradb::Json,
180 ) -> Result<(), ClientError> {
181 let name: crate::Identifier = name.into();
182 let value: crate::Json = value.clone().into();
183 let req = Request::new(crate::SetPropertiesRequest {
184 q: Some(q.into().into()),
185 name: name.into(),
186 value: value.clone().into(),
187 });
188 self.0.set_properties(req).await?;
189 Ok(())
190 }
191
192 pub async fn bulk_insert(&mut self, items: Vec<indradb::BulkInsertItem>) -> Result<(), ClientError> {
206 let (tx, rx) = mpsc::channel(CHANNEL_CAPACITY);
207 let last_err: Arc<Mutex<Option<ClientError>>> = Arc::new(Mutex::new(None));
208
209 {
210 let last_err = last_err.clone();
211 tokio::spawn(async move {
212 for item in items.into_iter() {
213 if let Err(err) = tx.send(item.into()).await {
214 *last_err.lock().unwrap() = Some(err.into());
215 return;
216 }
217 }
218 });
219 }
220
221 self.0.bulk_insert(Request::new(ReceiverStream::new(rx))).await?;
222
223 let mut last_err = last_err.lock().unwrap();
224 if last_err.is_some() {
225 Err(last_err.take().unwrap())
226 } else {
227 Ok(())
228 }
229 }
230
231 pub async fn index_property(&mut self, name: indradb::Identifier) -> Result<(), ClientError> {
232 let request = Request::new(crate::IndexPropertyRequest {
233 name: Some(name.into()),
234 });
235 self.0.index_property(request).await?;
236 Ok(())
237 }
238
239 pub async fn execute_plugin(&mut self, name: &str, arg: indradb::Json) -> Result<indradb::Json, ClientError> {
240 let req = Request::new(crate::ExecutePluginRequest {
241 name: name.to_string(),
242 arg: Some(arg.into()),
243 });
244 let res = self.0.execute_plugin(req).await?;
245 match res.into_inner().value {
246 Some(value) => Ok(value.try_into()?),
247 None => Ok(indradb::Json::new(serde_json::Value::Null)),
248 }
249 }
250}