ad4m_client/
perspectives.rs

1use std::sync::Arc;
2
3use crate::perspective_proxy::PerspectiveProxy;
4use crate::types::{LinkExpression, Perspective};
5use crate::util::{create_websocket_client, query, query_raw};
6use crate::ClientInfo;
7use anyhow::{anyhow, Context, Result};
8use chrono::naive::NaiveDateTime;
9use futures::StreamExt;
10use graphql_client::{GraphQLQuery, Response};
11use graphql_ws_client::graphql::StreamingOperation;
12use serde_json::Value;
13
14type DateTime = NaiveDateTime;
15
16use self::add_link::AddLinkPerspectiveAddLink;
17use self::all::AllPerspectives;
18
19#[derive(GraphQLQuery)]
20#[graphql(
21    schema_path = "schema.gql",
22    query_path = "src/perspectives.gql",
23    response_derives = "Debug"
24)]
25pub struct All;
26
27pub async fn all(executor_url: String, cap_token: String) -> Result<Vec<AllPerspectives>> {
28    let response_data: all::ResponseData =
29        query(executor_url, cap_token, All::build_query(all::Variables {}))
30            .await
31            .with_context(|| "Failed to run perspectives->all query")?;
32    Ok(response_data.perspectives)
33}
34
35#[derive(GraphQLQuery)]
36#[graphql(
37    schema_path = "schema.gql",
38    query_path = "src/perspectives.gql",
39    response_derives = "Debug"
40)]
41pub struct Add;
42
43pub async fn add(executor_url: String, cap_token: String, name: String) -> Result<String> {
44    let response_data: add::ResponseData = query(
45        executor_url,
46        cap_token,
47        Add::build_query(add::Variables { name }),
48    )
49    .await
50    .with_context(|| "Failed to run perspectives->add query")?;
51    Ok(response_data.perspective_add.uuid)
52}
53
54#[derive(GraphQLQuery)]
55#[graphql(
56    schema_path = "schema.gql",
57    query_path = "src/perspectives.gql",
58    response_derives = "Debug"
59)]
60pub struct Remove;
61
62pub async fn remove(executor_url: String, cap_token: String, uuid: String) -> Result<()> {
63    let response: remove::ResponseData = query(
64        executor_url,
65        cap_token,
66        Remove::build_query(remove::Variables { uuid }),
67    )
68    .await
69    .with_context(|| "Failed to run perspectives->remove query")?;
70    if response.perspective_remove {
71        Ok(())
72    } else {
73        Err(anyhow!("Failed to remove perspective"))
74    }
75}
76
77#[derive(GraphQLQuery)]
78#[graphql(
79    schema_path = "schema.gql",
80    query_path = "src/perspectives.gql",
81    response_derives = "Debug"
82)]
83pub struct AddLink;
84
85pub async fn add_link(
86    executor_url: String,
87    cap_token: String,
88    uuid: String,
89    source: String,
90    target: String,
91    predicate: Option<String>,
92    status: Option<String>,
93) -> Result<AddLinkPerspectiveAddLink> {
94    let response_data: add_link::ResponseData = query(
95        executor_url,
96        cap_token,
97        AddLink::build_query(add_link::Variables {
98            uuid,
99            link: add_link::LinkInput {
100                source,
101                target,
102                predicate,
103            },
104            status,
105        }),
106    )
107    .await
108    .with_context(|| "Failed to run perspectives->addLink query")?;
109
110    Ok(response_data.perspective_add_link)
111}
112
113#[derive(GraphQLQuery)]
114#[graphql(
115    schema_path = "schema.gql",
116    query_path = "src/perspectives.gql",
117    response_derives = "Debug"
118)]
119pub struct RemoveLink;
120
121pub async fn remove_link(
122    executor_url: String,
123    cap_token: String,
124    uuid: String,
125    link: LinkExpression,
126) -> Result<()> {
127    let response_data: remove_link::ResponseData = query(
128        executor_url,
129        cap_token,
130        RemoveLink::build_query(remove_link::Variables {
131            uuid,
132            link: remove_link::LinkExpressionInput {
133                author: link.author,
134                timestamp: link.timestamp,
135                data: remove_link::LinkInput {
136                    source: link.data.source,
137                    target: link.data.target,
138                    predicate: link.data.predicate,
139                },
140                proof: remove_link::ExpressionProofInput {
141                    signature: link.proof.signature,
142                    key: link.proof.key,
143                    invalid: link.proof.invalid,
144                    valid: link.proof.valid,
145                },
146                status: link.status,
147            },
148        }),
149    )
150    .await
151    .with_context(|| "Failed to run perspectives->removeLink query")?;
152
153    if response_data.perspective_remove_link {
154        Ok(())
155    } else {
156        Err(anyhow!("Failed to remove link"))
157    }
158}
159
160#[derive(GraphQLQuery)]
161#[graphql(
162    schema_path = "schema.gql",
163    query_path = "src/perspectives.gql",
164    response_derives = "Debug"
165)]
166pub struct QueryLinks;
167
168#[allow(clippy::too_many_arguments)]
169pub async fn query_links(
170    executor_url: String,
171    cap_token: String,
172    uuid: String,
173    source: Option<String>,
174    target: Option<String>,
175    predicate: Option<String>,
176    from_date: Option<DateTime>,
177    until_date: Option<DateTime>,
178    limit: Option<f64>,
179) -> Result<Vec<query_links::QueryLinksPerspectiveQueryLinks>> {
180    let response_data: query_links::ResponseData = query(
181        executor_url,
182        cap_token,
183        QueryLinks::build_query(query_links::Variables {
184            uuid,
185            query: query_links::LinkQuery {
186                source,
187                target,
188                predicate,
189                from_date,
190                until_date,
191                limit,
192            },
193        }),
194    )
195    .await
196    .with_context(|| "Failed to run perspectives->queryLinks query")?;
197
198    Ok(response_data.perspective_query_links.unwrap_or_default())
199}
200
201#[derive(GraphQLQuery)]
202#[graphql(
203    schema_path = "schema.gql",
204    query_path = "src/perspectives.gql",
205    response_derives = "Debug"
206)]
207pub struct Infer;
208
209pub async fn infer(
210    executor_url: String,
211    cap_token: String,
212    uuid: String,
213    prolog_query: String,
214) -> Result<Value> {
215    let response: Response<infer::ResponseData> = query_raw(
216        executor_url,
217        cap_token,
218        Infer::build_query(infer::Variables {
219            uuid,
220            query: prolog_query,
221        }),
222    )
223    .await?;
224
225    if let Some(data) = response.data {
226        let v: Value = serde_json::from_str(&data.perspective_query_prolog)?;
227        Ok(match v {
228            Value::String(string) => {
229                if string == "true" {
230                    Value::Bool(true)
231                } else if string == "false" {
232                    Value::Bool(false)
233                } else {
234                    Value::String(string)
235                }
236            }
237            _ => v,
238        })
239    } else {
240        if let Some(errors) = response.errors.clone() {
241            if let Some(error) = errors.first() {
242                if error.message.starts_with("error(") {
243                    return Err(anyhow!(error.message.clone()));
244                }
245            }
246        }
247        Err(anyhow!(
248            "Failed to run perspective->infer query: {:?}",
249            response.errors
250        ))
251    }
252}
253
254#[derive(GraphQLQuery)]
255#[graphql(
256    schema_path = "schema.gql",
257    query_path = "src/perspectives.gql",
258    response_derives = "Debug"
259)]
260pub struct SubscriptionLinkAdded;
261
262pub async fn watch(
263    executor_url: String,
264    cap_token: String,
265    id: String,
266    link_callback: Box<dyn Fn(LinkExpression)>,
267) -> Result<()> {
268    let mut client = create_websocket_client(executor_url, cap_token)
269        .await
270        .with_context(|| "Failed to create websocket client")?;
271
272    let mut stream = client
273        .streaming_operation(StreamingOperation::<SubscriptionLinkAdded>::new(
274            subscription_link_added::Variables { uuid: id.clone() },
275        ))
276        .await
277        .with_context(|| "Failed to subscribe to perspectiveLinkAdded")?;
278
279    println!(
280        "Successfully subscribed to perspectiveLinkAdded for perspective {}",
281        id
282    );
283    println!("Waiting for events...");
284
285    while let Some(item) = stream.next().await {
286        match item {
287            Ok(response) => {
288                if let Some(link) = response.data.and_then(|data| data.perspective_link_added) {
289                    link_callback(link.into())
290                }
291            }
292            Err(e) => {
293                println!("Received Error: {:?}", e);
294            }
295        }
296    }
297
298    println!("Stream ended. Exiting...");
299
300    Ok(())
301}
302
303#[derive(GraphQLQuery)]
304#[graphql(
305    schema_path = "schema.gql",
306    query_path = "src/perspectives.gql",
307    response_derives = "Debug"
308)]
309pub struct Snapshot;
310
311pub async fn snapshot(
312    executor_url: String,
313    cap_token: String,
314    uuid: String,
315) -> Result<Perspective> {
316    let response: snapshot::ResponseData = query(
317        executor_url,
318        cap_token,
319        Snapshot::build_query(snapshot::Variables { uuid }),
320    )
321    .await
322    .with_context(|| "Failed to run perspectives->snapshot query")?;
323    Ok(response
324        .perspective_snapshot
325        .ok_or_else(|| anyhow!("No perspective found"))?
326        .into())
327}
328
329#[derive(Clone)]
330pub struct PerspectivesClient {
331    info: Arc<ClientInfo>,
332}
333
334impl PerspectivesClient {
335    pub fn new(info: Arc<ClientInfo>) -> Self {
336        Self { info }
337    }
338
339    pub async fn all(&self) -> Result<Vec<AllPerspectives>> {
340        all(self.info.executor_url.clone(), self.info.cap_token.clone()).await
341    }
342
343    pub async fn add(&self, name: String) -> Result<String> {
344        add(
345            self.info.executor_url.clone(),
346            self.info.cap_token.clone(),
347            name,
348        )
349        .await
350    }
351
352    pub async fn remove(&self, uuid: String) -> Result<()> {
353        remove(
354            self.info.executor_url.clone(),
355            self.info.cap_token.clone(),
356            uuid,
357        )
358        .await
359    }
360
361    pub async fn add_link(
362        &self,
363        uid: String,
364        source: String,
365        target: String,
366        predicate: Option<String>,
367        status: Option<String>,
368    ) -> Result<AddLinkPerspectiveAddLink> {
369        add_link(
370            self.info.executor_url.clone(),
371            self.info.cap_token.clone(),
372            uid,
373            source,
374            target,
375            predicate,
376            status,
377        )
378        .await
379    }
380
381    pub async fn remove_link(&self, uid: String, link: LinkExpression) -> Result<()> {
382        remove_link(
383            self.info.executor_url.clone(),
384            self.info.cap_token.clone(),
385            uid,
386            link,
387        )
388        .await
389    }
390
391    #[allow(clippy::too_many_arguments)]
392    pub async fn query_links(
393        &self,
394        uuid: String,
395        source: Option<String>,
396        target: Option<String>,
397        predicate: Option<String>,
398        from_date: Option<DateTime>,
399        until_date: Option<DateTime>,
400        limit: Option<f64>,
401    ) -> Result<Vec<query_links::QueryLinksPerspectiveQueryLinks>> {
402        query_links(
403            self.info.executor_url.clone(),
404            self.info.cap_token.clone(),
405            uuid,
406            source,
407            target,
408            predicate,
409            from_date,
410            until_date,
411            limit,
412        )
413        .await
414    }
415
416    pub async fn infer(&self, uuid: String, prolog_query: String) -> Result<Value> {
417        infer(
418            self.info.executor_url.clone(),
419            self.info.cap_token.clone(),
420            uuid,
421            prolog_query,
422        )
423        .await
424    }
425
426    pub async fn watch(
427        &self,
428        id: String,
429        link_callback: Box<dyn Fn(LinkExpression)>,
430    ) -> Result<()> {
431        watch(
432            self.info.executor_url.clone(),
433            self.info.cap_token.clone(),
434            id,
435            link_callback,
436        )
437        .await
438    }
439
440    pub async fn snapshot(&self, uuid: String) -> Result<Perspective> {
441        snapshot(
442            self.info.executor_url.clone(),
443            self.info.cap_token.clone(),
444            uuid,
445        )
446        .await
447    }
448
449    pub async fn get(&self, uuid: String) -> Result<PerspectiveProxy> {
450        self.all()
451            .await?
452            .iter()
453            .find(|p| p.uuid == uuid)
454            .ok_or_else(|| anyhow!("Perspective with ID {} not found!", uuid))?;
455
456        Ok(PerspectiveProxy::new(self.clone(), uuid.clone()))
457    }
458}