1use std::sync::Arc;
2
3use crate::perspective_proxy::PerspectiveProxy;
4use crate::types::{LinkExpression, Perspective};
5use crate::util::{create_websocket_client, query, query_raw};
6use crate::ClientInfo;
7use anyhow::{anyhow, Context, Result};
8use chrono::naive::NaiveDateTime;
9use futures::StreamExt;
10use graphql_client::{GraphQLQuery, Response};
11use graphql_ws_client::graphql::StreamingOperation;
12use serde_json::Value;
13
14type DateTime = NaiveDateTime;
15
16use self::add_link::AddLinkPerspectiveAddLink;
17use self::all::AllPerspectives;
18
19#[derive(GraphQLQuery)]
20#[graphql(
21 schema_path = "schema.gql",
22 query_path = "src/perspectives.gql",
23 response_derives = "Debug"
24)]
25pub struct All;
26
27pub async fn all(executor_url: String, cap_token: String) -> Result<Vec<AllPerspectives>> {
28 let response_data: all::ResponseData =
29 query(executor_url, cap_token, All::build_query(all::Variables {}))
30 .await
31 .with_context(|| "Failed to run perspectives->all query")?;
32 Ok(response_data.perspectives)
33}
34
35#[derive(GraphQLQuery)]
36#[graphql(
37 schema_path = "schema.gql",
38 query_path = "src/perspectives.gql",
39 response_derives = "Debug"
40)]
41pub struct Add;
42
43pub async fn add(executor_url: String, cap_token: String, name: String) -> Result<String> {
44 let response_data: add::ResponseData = query(
45 executor_url,
46 cap_token,
47 Add::build_query(add::Variables { name }),
48 )
49 .await
50 .with_context(|| "Failed to run perspectives->add query")?;
51 Ok(response_data.perspective_add.uuid)
52}
53
54#[derive(GraphQLQuery)]
55#[graphql(
56 schema_path = "schema.gql",
57 query_path = "src/perspectives.gql",
58 response_derives = "Debug"
59)]
60pub struct Remove;
61
62pub async fn remove(executor_url: String, cap_token: String, uuid: String) -> Result<()> {
63 let response: remove::ResponseData = query(
64 executor_url,
65 cap_token,
66 Remove::build_query(remove::Variables { uuid }),
67 )
68 .await
69 .with_context(|| "Failed to run perspectives->remove query")?;
70 if response.perspective_remove {
71 Ok(())
72 } else {
73 Err(anyhow!("Failed to remove perspective"))
74 }
75}
76
77#[derive(GraphQLQuery)]
78#[graphql(
79 schema_path = "schema.gql",
80 query_path = "src/perspectives.gql",
81 response_derives = "Debug"
82)]
83pub struct AddLink;
84
85pub async fn add_link(
86 executor_url: String,
87 cap_token: String,
88 uuid: String,
89 source: String,
90 target: String,
91 predicate: Option<String>,
92 status: Option<String>,
93) -> Result<AddLinkPerspectiveAddLink> {
94 let response_data: add_link::ResponseData = query(
95 executor_url,
96 cap_token,
97 AddLink::build_query(add_link::Variables {
98 uuid,
99 link: add_link::LinkInput {
100 source,
101 target,
102 predicate,
103 },
104 status,
105 }),
106 )
107 .await
108 .with_context(|| "Failed to run perspectives->addLink query")?;
109
110 Ok(response_data.perspective_add_link)
111}
112
113#[derive(GraphQLQuery)]
114#[graphql(
115 schema_path = "schema.gql",
116 query_path = "src/perspectives.gql",
117 response_derives = "Debug"
118)]
119pub struct RemoveLink;
120
121pub async fn remove_link(
122 executor_url: String,
123 cap_token: String,
124 uuid: String,
125 link: LinkExpression,
126) -> Result<()> {
127 let response_data: remove_link::ResponseData = query(
128 executor_url,
129 cap_token,
130 RemoveLink::build_query(remove_link::Variables {
131 uuid,
132 link: remove_link::LinkExpressionInput {
133 author: link.author,
134 timestamp: link.timestamp,
135 data: remove_link::LinkInput {
136 source: link.data.source,
137 target: link.data.target,
138 predicate: link.data.predicate,
139 },
140 proof: remove_link::ExpressionProofInput {
141 signature: link.proof.signature,
142 key: link.proof.key,
143 invalid: link.proof.invalid,
144 valid: link.proof.valid,
145 },
146 status: link.status,
147 },
148 }),
149 )
150 .await
151 .with_context(|| "Failed to run perspectives->removeLink query")?;
152
153 if response_data.perspective_remove_link {
154 Ok(())
155 } else {
156 Err(anyhow!("Failed to remove link"))
157 }
158}
159
160#[derive(GraphQLQuery)]
161#[graphql(
162 schema_path = "schema.gql",
163 query_path = "src/perspectives.gql",
164 response_derives = "Debug"
165)]
166pub struct QueryLinks;
167
168#[allow(clippy::too_many_arguments)]
169pub async fn query_links(
170 executor_url: String,
171 cap_token: String,
172 uuid: String,
173 source: Option<String>,
174 target: Option<String>,
175 predicate: Option<String>,
176 from_date: Option<DateTime>,
177 until_date: Option<DateTime>,
178 limit: Option<f64>,
179) -> Result<Vec<query_links::QueryLinksPerspectiveQueryLinks>> {
180 let response_data: query_links::ResponseData = query(
181 executor_url,
182 cap_token,
183 QueryLinks::build_query(query_links::Variables {
184 uuid,
185 query: query_links::LinkQuery {
186 source,
187 target,
188 predicate,
189 from_date,
190 until_date,
191 limit,
192 },
193 }),
194 )
195 .await
196 .with_context(|| "Failed to run perspectives->queryLinks query")?;
197
198 Ok(response_data.perspective_query_links.unwrap_or_default())
199}
200
201#[derive(GraphQLQuery)]
202#[graphql(
203 schema_path = "schema.gql",
204 query_path = "src/perspectives.gql",
205 response_derives = "Debug"
206)]
207pub struct Infer;
208
209pub async fn infer(
210 executor_url: String,
211 cap_token: String,
212 uuid: String,
213 prolog_query: String,
214) -> Result<Value> {
215 let response: Response<infer::ResponseData> = query_raw(
216 executor_url,
217 cap_token,
218 Infer::build_query(infer::Variables {
219 uuid,
220 query: prolog_query,
221 }),
222 )
223 .await?;
224
225 if let Some(data) = response.data {
226 let v: Value = serde_json::from_str(&data.perspective_query_prolog)?;
227 Ok(match v {
228 Value::String(string) => {
229 if string == "true" {
230 Value::Bool(true)
231 } else if string == "false" {
232 Value::Bool(false)
233 } else {
234 Value::String(string)
235 }
236 }
237 _ => v,
238 })
239 } else {
240 if let Some(errors) = response.errors.clone() {
241 if let Some(error) = errors.first() {
242 if error.message.starts_with("error(") {
243 return Err(anyhow!(error.message.clone()));
244 }
245 }
246 }
247 Err(anyhow!(
248 "Failed to run perspective->infer query: {:?}",
249 response.errors
250 ))
251 }
252}
253
254#[derive(GraphQLQuery)]
255#[graphql(
256 schema_path = "schema.gql",
257 query_path = "src/perspectives.gql",
258 response_derives = "Debug"
259)]
260pub struct SubscriptionLinkAdded;
261
262pub async fn watch(
263 executor_url: String,
264 cap_token: String,
265 id: String,
266 link_callback: Box<dyn Fn(LinkExpression)>,
267) -> Result<()> {
268 let mut client = create_websocket_client(executor_url, cap_token)
269 .await
270 .with_context(|| "Failed to create websocket client")?;
271
272 let mut stream = client
273 .streaming_operation(StreamingOperation::<SubscriptionLinkAdded>::new(
274 subscription_link_added::Variables { uuid: id.clone() },
275 ))
276 .await
277 .with_context(|| "Failed to subscribe to perspectiveLinkAdded")?;
278
279 println!(
280 "Successfully subscribed to perspectiveLinkAdded for perspective {}",
281 id
282 );
283 println!("Waiting for events...");
284
285 while let Some(item) = stream.next().await {
286 match item {
287 Ok(response) => {
288 if let Some(link) = response.data.and_then(|data| data.perspective_link_added) {
289 link_callback(link.into())
290 }
291 }
292 Err(e) => {
293 println!("Received Error: {:?}", e);
294 }
295 }
296 }
297
298 println!("Stream ended. Exiting...");
299
300 Ok(())
301}
302
303#[derive(GraphQLQuery)]
304#[graphql(
305 schema_path = "schema.gql",
306 query_path = "src/perspectives.gql",
307 response_derives = "Debug"
308)]
309pub struct Snapshot;
310
311pub async fn snapshot(
312 executor_url: String,
313 cap_token: String,
314 uuid: String,
315) -> Result<Perspective> {
316 let response: snapshot::ResponseData = query(
317 executor_url,
318 cap_token,
319 Snapshot::build_query(snapshot::Variables { uuid }),
320 )
321 .await
322 .with_context(|| "Failed to run perspectives->snapshot query")?;
323 Ok(response
324 .perspective_snapshot
325 .ok_or_else(|| anyhow!("No perspective found"))?
326 .into())
327}
328
329#[derive(Clone)]
330pub struct PerspectivesClient {
331 info: Arc<ClientInfo>,
332}
333
334impl PerspectivesClient {
335 pub fn new(info: Arc<ClientInfo>) -> Self {
336 Self { info }
337 }
338
339 pub async fn all(&self) -> Result<Vec<AllPerspectives>> {
340 all(self.info.executor_url.clone(), self.info.cap_token.clone()).await
341 }
342
343 pub async fn add(&self, name: String) -> Result<String> {
344 add(
345 self.info.executor_url.clone(),
346 self.info.cap_token.clone(),
347 name,
348 )
349 .await
350 }
351
352 pub async fn remove(&self, uuid: String) -> Result<()> {
353 remove(
354 self.info.executor_url.clone(),
355 self.info.cap_token.clone(),
356 uuid,
357 )
358 .await
359 }
360
361 pub async fn add_link(
362 &self,
363 uid: String,
364 source: String,
365 target: String,
366 predicate: Option<String>,
367 status: Option<String>,
368 ) -> Result<AddLinkPerspectiveAddLink> {
369 add_link(
370 self.info.executor_url.clone(),
371 self.info.cap_token.clone(),
372 uid,
373 source,
374 target,
375 predicate,
376 status,
377 )
378 .await
379 }
380
381 pub async fn remove_link(&self, uid: String, link: LinkExpression) -> Result<()> {
382 remove_link(
383 self.info.executor_url.clone(),
384 self.info.cap_token.clone(),
385 uid,
386 link,
387 )
388 .await
389 }
390
391 #[allow(clippy::too_many_arguments)]
392 pub async fn query_links(
393 &self,
394 uuid: String,
395 source: Option<String>,
396 target: Option<String>,
397 predicate: Option<String>,
398 from_date: Option<DateTime>,
399 until_date: Option<DateTime>,
400 limit: Option<f64>,
401 ) -> Result<Vec<query_links::QueryLinksPerspectiveQueryLinks>> {
402 query_links(
403 self.info.executor_url.clone(),
404 self.info.cap_token.clone(),
405 uuid,
406 source,
407 target,
408 predicate,
409 from_date,
410 until_date,
411 limit,
412 )
413 .await
414 }
415
416 pub async fn infer(&self, uuid: String, prolog_query: String) -> Result<Value> {
417 infer(
418 self.info.executor_url.clone(),
419 self.info.cap_token.clone(),
420 uuid,
421 prolog_query,
422 )
423 .await
424 }
425
426 pub async fn watch(
427 &self,
428 id: String,
429 link_callback: Box<dyn Fn(LinkExpression)>,
430 ) -> Result<()> {
431 watch(
432 self.info.executor_url.clone(),
433 self.info.cap_token.clone(),
434 id,
435 link_callback,
436 )
437 .await
438 }
439
440 pub async fn snapshot(&self, uuid: String) -> Result<Perspective> {
441 snapshot(
442 self.info.executor_url.clone(),
443 self.info.cap_token.clone(),
444 uuid,
445 )
446 .await
447 }
448
449 pub async fn get(&self, uuid: String) -> Result<PerspectiveProxy> {
450 self.all()
451 .await?
452 .iter()
453 .find(|p| p.uuid == uuid)
454 .ok_or_else(|| anyhow!("Perspective with ID {} not found!", uuid))?;
455
456 Ok(PerspectiveProxy::new(self.clone(), uuid.clone()))
457 }
458}