1use taganak_core::{default_graph_view, prelude::*};
19
20use futures::{Stream, StreamExt, TryFutureExt};
21
22use taganak_core::graphs::{GraphError, GraphView};
23use taganak_core::terms::{BlankNode, Literal};
24
25use std::collections::{HashMap, HashSet};
26use std::pin::pin;
27
28use iref::IriRefBuf;
29use thiserror::Error;
30
31#[derive(Debug, Error, PartialEq)]
33pub enum GraphORMError {
34 #[error("Failed to deserialize from graph: {0}")]
35 FailedDeserialize(String),
36 #[error("Failed to serialize to graph: {0}")]
37 FailedSerialize(String),
38 #[error("Failed to generate schema: {0}")]
39 FailedSchema(String),
40}
41
42#[derive(Default, Debug, Clone, PartialEq)]
47pub enum SubjectMode {
48 #[default]
51 Generated,
52 Static(Arc<Term>),
55}
56
57#[derive(Debug, Clone, PartialEq)]
64pub struct GraphORMMeta<T>
65where
66 T: std::fmt::Debug + PartialEq,
67{
68 subject_mode: SubjectMode,
69 thing: T,
70}
71
72impl<T> core::ops::Deref for GraphORMMeta<T>
73where
74 T: std::fmt::Debug + PartialEq,
75{
76 type Target = T;
77
78 fn deref(&self) -> &Self::Target {
79 &self.thing
80 }
81}
82
83impl<T> core::ops::DerefMut for GraphORMMeta<T>
84where
85 T: std::fmt::Debug + PartialEq,
86{
87 fn deref_mut(&mut self) -> &mut Self::Target {
88 &mut self.thing
89 }
90}
91
92impl<T> GraphORMMeta<T>
93where
94 T: GraphORM + Unpin + std::fmt::Debug + PartialEq + Sync + Send,
95{
96 pub fn new(thing: T) -> Self {
100 Self {
101 subject_mode: SubjectMode::default(),
102 thing,
103 }
104 }
105
106 pub fn into_deref(self) -> T {
114 self.thing
115 }
116
117 pub fn with_subject(mut self, subject: Arc<Term>) -> Self {
121 self.subject_mode = SubjectMode::Static(subject);
122 self
123 }
124
125 pub fn fixate_subject(&mut self) -> Result<(), GraphORMError> {
133 match self.subject_mode {
134 SubjectMode::Generated => {
135 self.subject_mode = SubjectMode::Static(self.thing.rdf_subject()?);
136 }
137 SubjectMode::Static(_) => {}
138 };
139
140 Ok(())
141 }
142
143 pub fn subject(&self) -> Result<Arc<Term>, GraphORMError> {
144 match &self.subject_mode {
145 SubjectMode::Generated => self.thing.rdf_subject(),
146 SubjectMode::Static(s) => Ok(s.clone()),
147 }
148 }
149
150 pub fn serialize_stream(
155 &self,
156 ) -> Result<impl Stream<Item = Arc<Triple>> + Unpin + Send + use<'_, T>, GraphORMError> {
157 self.thing.serialize_stream(self.subject()?)
158 }
159
160 pub async fn serialize(&self, graph: &mut impl Graph) -> Result<(), GraphORMError> {
165 graph
166 .merge_stream(
167 self.stream()
168 .await
169 .map_err(|e| todo!("find matching error kind"))?,
170 )
171 .await
172 .map_err(|e| todo!("find matching error kind"))?;
173 Ok(())
174 }
175}
176
177impl<T> Graph for GraphORMMeta<T>
178where
179 T: GraphORM + Unpin + std::fmt::Debug + PartialEq + Sync + Send,
180{
181 type View<'g>
182 = &'g Self
183 where
184 Self: 'g;
185
186 async fn view(&self) -> Self::View<'_> {
187 self
188 }
189}
190
191impl<'a, T> GraphView for &'a GraphORMMeta<T>
192where
193 T: GraphORM + Unpin + std::fmt::Debug + PartialEq + Sync + Send + 'a,
194{
195 type Stream =
196 core::pin::Pin<Box<dyn Stream<Item = Result<Arc<Triple>, GraphError>> + Unpin + Send + 'a>>;
197 async fn stream(self) -> Result<Self::Stream, GraphError> {
198 Ok(Box::pin(
199 self.serialize_stream()
200 .map_err(|e| todo!("find matching error kind") as GraphError)?
201 .map(|t| Ok(t)),
202 ))
203 }
204 default_graph_view!();
205}
206
207pub trait GraphORM {
208 fn rdf_subject(&self) -> Result<Arc<Term>, GraphORMError> {
217 Ok(Arc::new(Term::BlankNode(BlankNode::new())))
218 }
219
220 fn rdf_type() -> Arc<Term> {
224 todo!("default impl -> rdfs:Resource")
225 }
226
227 async fn serialize_schema(graph: &mut impl Graph) -> Result<(), GraphORMError>;
237
238 fn serialize_stream(
240 &self,
241 subject: Arc<Term>,
242 ) -> Result<impl Stream<Item = Arc<Triple>> + Unpin + Send, GraphORMError>;
243
244 async fn deserialize(
246 graph: impl GraphView,
247 subject: &Term,
248 ) -> Result<GraphORMMeta<Self>, GraphORMError>
249 where
250 Self: Sized + std::fmt::Debug + PartialEq;
251
252 fn into_meta(self) -> GraphORMMeta<Self>
254 where
255 Self: Sized + Unpin + std::fmt::Debug + PartialEq + Sync + Send,
256 {
257 GraphORMMeta::new(self)
258 }
259}
260
261#[cfg(feature = "derive")]
262mod magic {
263 use async_stream::stream;
264 use futures::stream;
265 use std::collections::{BTreeMap, BTreeSet};
266
267 #[doc(inline)]
268 pub use taganak_orm_derive::*;
269
270 pub mod re {
271 pub use async_stream;
273 pub use futures;
274 pub use taganak_core::prelude::*;
275 pub use tracing::trace;
276 }
277 use super::*;
278
279 pub trait GraphORMField {
294 fn orm_triples(
296 &self,
297 subject: Option<&Term>,
298 predicate: Option<&Term>,
299 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError>;
300
301 async fn orm_field_from_graph(
307 graph: impl GraphView,
308 subject: &Term,
309 predicate: &Term,
310 object: Option<&Term>,
311 ) -> Result<Self, GraphORMError>
312 where
313 Self: Sized;
314 }
315
316 impl<F> GraphORMField for GraphORMMeta<F>
317 where
318 F: GraphORMField + GraphORM + core::fmt::Debug + Unpin + Eq + Sync + Send,
319 {
320 fn orm_triples(
321 &self,
322 subject: Option<&Term>,
323 predicate: Option<&Term>,
324 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
325 Ok(Box::pin(stream! {
326 let subject = self.subject().unwrap();
327 let mut stream = pin!((**self).orm_triples(Some(subject.as_ref()), predicate).unwrap());
328 while let Some(triple) = stream.next().await {
329 yield triple;
330 }
331 }))
332 }
333
334 async fn orm_field_from_graph(
335 graph: impl GraphView,
336 subject: &Term,
337 predicate: &Term,
338 object: Option<&Term>,
339 ) -> Result<Self, GraphORMError>
340 where
341 Self: Sized,
342 {
343 let mut meta = GraphORMMeta::new(
344 F::orm_field_from_graph(graph.clone(), subject, predicate, object).await?,
345 );
346
347 let matches_errfn = |e| {
348 GraphORMError::FailedDeserialize(format!(
349 "failed to query graph for subject {subject}"
350 ))
351 };
352 if let Some(Ok(t)) = pin!(graph
353 .matches((Some(subject), Some(predicate), object), None)
354 .await
355 .map_err(matches_errfn)?
356 .stream()
357 .await
358 .map_err(matches_errfn)?)
359 .next()
360 .await
361 {
362 meta = meta.with_subject(t.object_arc().clone());
363 }
364
365 Ok(meta)
366 }
367 }
368
369 impl<F> GraphORMField for Option<F>
370 where
371 F: GraphORMField,
372 {
373 fn orm_triples(
374 &self,
375 subject: Option<&Term>,
376 predicate: Option<&Term>,
377 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
378 Ok(Box::pin(stream! {
379 if let Some(f) = self {
380 let mut stream = pin!(f.orm_triples(subject, predicate).unwrap());
381 while let Some(triple) = stream.next().await {
382 yield triple;
383 }
384 }
385 }))
386 }
387
388 async fn orm_field_from_graph(
389 graph: impl GraphView,
390 subject: &Term,
391 predicate: &Term,
392 object: Option<&Term>,
393 ) -> Result<Self, GraphORMError>
394 where
395 Self: Sized,
396 {
397 let matches_errfn = |e| {
398 GraphORMError::FailedDeserialize(format!(
399 "failed to query graph for subject {subject}"
400 ))
401 };
402 if pin!(graph
403 .clone()
404 .matches((Some(subject), Some(predicate), object), None)
405 .await
406 .map_err(matches_errfn)?
407 .stream()
408 .await
409 .map_err(matches_errfn)?)
410 .next()
411 .await
412 .is_some()
413 {
414 Ok(Some(
415 F::orm_field_from_graph(graph, subject, predicate, object).await?,
416 ))
417 } else {
418 Ok(None)
419 }
420 }
421 }
422
423 impl<F> GraphORMField for Arc<F>
424 where
425 F: GraphORMField,
426 {
427 fn orm_triples(
428 &self,
429 subject: Option<&Term>,
430 predicate: Option<&Term>,
431 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
432 Ok(Box::pin(stream! {
433 let mut stream = pin!((**self).orm_triples(subject, predicate).unwrap());
434 while let Some(triple) = stream.next().await {
435 yield triple;
436 }
437 }))
438 }
439
440 async fn orm_field_from_graph(
441 graph: impl GraphView,
442 subject: &Term,
443 predicate: &Term,
444 object: Option<&Term>,
445 ) -> Result<Self, GraphORMError>
446 where
447 Self: Sized,
448 {
449 Ok(Arc::new(
450 F::orm_field_from_graph(graph, subject, predicate, object).await?,
451 ))
452 }
453 }
454
455 impl<F> GraphORMField for HashSet<F>
456 where
457 F: GraphORMField + Eq + std::hash::Hash,
458 {
459 fn orm_triples(
460 &self,
461 subject: Option<&Term>,
462 predicate: Option<&Term>,
463 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
464 Ok(Box::pin(async_stream::stream! {
465 for v in self {
467 let mut stream = pin!(v.orm_triples(subject, predicate).unwrap());
468 while let Some(triple) = stream.next().await {
469 yield triple;
470 }
471 }
472 }))
473 }
474
475 async fn orm_field_from_graph(
476 graph: impl GraphView,
477 subject: &Term,
478 predicate: &Term,
479 object: Option<&Term>,
480 ) -> Result<Self, GraphORMError>
481 where
482 Self: Sized,
483 {
484 let mut set = HashSet::new();
485 let matches_errfn = |e| {
486 GraphORMError::FailedDeserialize(format!(
487 "failed to query graph for subject {subject}"
488 ))
489 };
490 let mut triples = pin!(graph
491 .clone()
492 .matches((Some(subject), Some(predicate), object), None)
493 .await
494 .map_err(matches_errfn)?
495 .stream()
496 .await
497 .map_err(matches_errfn)?);
498
499 while let Some(triple) = triples.next().await {
500 let triple = triple.expect("handle error streams, actually");
502 set.insert(
503 F::orm_field_from_graph(
504 graph.clone(),
505 subject,
506 predicate,
507 Some(triple.object()),
508 )
509 .await?,
510 );
511 }
512
513 Ok(set)
514 }
515 }
516
517 impl<F> GraphORMField for BTreeSet<F>
518 where
519 F: Ord + GraphORMField,
520 {
521 fn orm_triples(
522 &self,
523 subject: Option<&Term>,
524 predicate: Option<&Term>,
525 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
526 Ok(Box::pin(async_stream::stream! {
527 for v in self {
529 let mut stream = pin!(v.orm_triples(subject, predicate).unwrap());
530 while let Some(triple) = stream.next().await {
531 yield triple;
532 }
533 }
534 }))
535 }
536
537 async fn orm_field_from_graph(
538 graph: impl GraphView,
539 subject: &Term,
540 predicate: &Term,
541 object: Option<&Term>,
542 ) -> Result<Self, GraphORMError>
543 where
544 Self: Sized,
545 {
546 let matches_errfn = |e| {
547 GraphORMError::FailedDeserialize(format!(
548 "failed to query graph for subject {subject}"
549 ))
550 };
551 let mut set = BTreeSet::new();
552 let mut triples = pin!(graph
553 .clone()
554 .matches((Some(subject), Some(predicate), object), None)
555 .await
556 .map_err(matches_errfn)?
557 .stream()
558 .await
559 .map_err(matches_errfn)?);
560
561 while let Some(triple) = triples.next().await {
562 let triple = triple.expect("handle error streams suitably");
564 set.insert(
565 F::orm_field_from_graph(
566 graph.clone(),
567 subject,
568 predicate,
569 Some(triple.object()),
570 )
571 .await?,
572 );
573 }
574
575 Ok(set)
576 }
577 }
578
579 impl GraphORMField for Term {
580 fn orm_triples(
581 &self,
582 subject: Option<&Term>,
583 predicate: Option<&Term>,
584 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
585 Ok(Box::pin(stream! {
586 yield Triple::new(Arc::new(subject.unwrap().clone()), Arc::new(predicate.unwrap().clone()), Arc::new(self.clone())).unwrap();
587 }))
588 }
589
590 async fn orm_field_from_graph(
591 graph: impl GraphView,
592 subject: &Term,
593 predicate: &Term,
594 object: Option<&Term>,
595 ) -> Result<Self, GraphORMError>
596 where
597 Self: Sized,
598 {
599 let matches_errfn = |e| {
600 GraphORMError::FailedDeserialize(format!(
601 "failed to query graph for subject {subject}"
602 ))
603 };
604 Ok((*graph
605 .object(Some(subject), Some(predicate))
606 .await
607 .map_err(matches_errfn)?
608 .ok_or(GraphORMError::FailedDeserialize(format!(
609 "failed to query graph for subject {subject}"
610 )))?)
611 .clone())
612 }
613 }
614
615 impl<F> GraphORMField for Vec<F>
616 where
617 F: GraphORMField,
618 {
619 fn orm_triples(
620 &self,
621 subject: Option<&Term>,
622 predicate: Option<&Term>,
623 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
624 Ok(Box::pin(async_stream::stream! {
625 for v in self {
627 let mut stream = pin!(v.orm_triples(subject, predicate).unwrap());
628 while let Some(triple) = stream.next().await {
629 yield triple;
631 }
632 }
633 }))
634 }
635
636 async fn orm_field_from_graph(
637 graph: impl GraphView,
638 subject: &Term,
639 predicate: &Term,
640 object: Option<&Term>,
641 ) -> Result<Self, GraphORMError>
642 where
643 Self: Sized,
644 {
645 let mut items = Vec::new();
646
647 let matches_errfn = |e| {
648 GraphORMError::FailedDeserialize(format!(
649 "failed to query graph for subject {subject}"
650 ))
651 };
652
653 let rdf_first = Term::NamedNode(
654 Iri::new("http://www.w3.org/1999/02/22-rdf-syntax-ns#first".to_string()).unwrap(),
655 );
656 let rdf_rest = Term::NamedNode(
657 Iri::new("http://www.w3.org/1999/02/22-rdf-syntax-ns#rest".to_string()).unwrap(),
658 );
659 let rdf_nil = Term::NamedNode(
660 Iri::new("http://www.w3.org/1999/02/22-rdf-syntax-ns#nil".to_string()).unwrap(),
661 );
662
663 let mut current = (*graph
664 .clone()
665 .object(Some(subject), Some(predicate))
666 .await
667 .map_err(matches_errfn)?
668 .ok_or(GraphORMError::FailedDeserialize(format!(
669 "failed to query graph for subject {subject}"
670 )))?)
671 .clone();
672 let mut rest;
673 loop {
674 items.push(
675 F::orm_field_from_graph(graph.clone(), ¤t, &rdf_first, None).await?,
676 );
677
678 rest = graph
679 .clone()
680 .object(Some(¤t), Some(&rdf_rest))
681 .await
682 .map_err(matches_errfn)?
683 .ok_or(GraphORMError::FailedDeserialize(
684 "not a valid RDF collection".to_string(),
685 ))?;
686 if *rest == rdf_nil {
687 break;
688 } else {
689 current = (*rest).clone();
690 }
691 }
692
693 Ok(items)
694 }
695 }
696
697 impl<F> GraphORMField for HashMap<Arc<Term>, F>
698 where
699 F: GraphORMField + Eq + std::hash::Hash,
700 {
701 fn orm_triples(
702 &self,
703 subject: Option<&Term>,
704 predicate: Option<&Term>,
705 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
706 Ok(Box::pin(async_stream::stream! {
707 for v in self.values() {
709 let mut stream = pin!(v.orm_triples(subject, predicate).unwrap());
710 while let Some(triple) = stream.next().await {
711 yield triple;
712 }
713 }
714 }))
715 }
716
717 async fn orm_field_from_graph(
718 graph: impl GraphView,
719 subject: &Term,
720 predicate: &Term,
721 object: Option<&Term>,
722 ) -> Result<Self, GraphORMError>
723 where
724 Self: Sized,
725 {
726 let matches_errfn = |e| {
727 GraphORMError::FailedDeserialize(format!(
728 "failed to query graph for subject {subject}"
729 ))
730 };
731 let mut map = HashMap::new();
732 let mut triples = pin!(graph
733 .clone()
734 .matches((Some(subject), Some(predicate), object), None)
735 .await
736 .map_err(matches_errfn)?
737 .stream()
738 .await
739 .map_err(matches_errfn)?);
740
741 while let Some(triple) = triples.next().await {
742 let triple = triple.expect("handle error streams appropriately");
744 map.insert(
745 Arc::new(triple.object().clone()),
746 F::orm_field_from_graph(
747 graph.clone(),
748 subject,
749 predicate,
750 Some(triple.object()),
751 )
752 .await?,
753 );
754 }
755
756 Ok(map)
757 }
758 }
759 #[inline(always)]
764 pub async fn get_graph_orm_field<F: GraphORMField>(
765 graph: impl GraphView,
766 subject: &Term,
767 predicate: &Term,
768 object: Option<&Term>,
769 ) -> Result<F, GraphORMError> {
770 F::orm_field_from_graph(graph, subject, predicate, object).await
771 }
772
773 pub trait GraphORMValue: TryFrom<Literal> + Clone
774 where
775 Literal: for<'a> TryFrom<Self>,
776 {
777 }
778
779 impl GraphORMValue for u8 {}
780 impl GraphORMValue for u16 {}
781 impl GraphORMValue for u32 {}
782 impl GraphORMValue for u64 {}
783 impl GraphORMValue for u128 {}
784 impl GraphORMValue for i8 {}
785 impl GraphORMValue for i16 {}
786 impl GraphORMValue for i32 {}
787 impl GraphORMValue for i64 {}
788 impl GraphORMValue for f32 {}
789 impl GraphORMValue for f64 {}
790 impl GraphORMValue for i128 {}
791 impl GraphORMValue for String {}
792 impl GraphORMValue for IriRefBuf {}
793 impl GraphORMValue for bool {}
794 #[cfg(feature = "chrono")]
795 impl GraphORMValue for chrono::NaiveDate {}
796 #[cfg(feature = "chrono")]
797 impl GraphORMValue for chrono::NaiveTime {}
798 #[cfg(feature = "chrono")]
799 impl GraphORMValue for chrono::NaiveDateTime {}
800 #[cfg(feature = "decimal")]
801 impl GraphORMValue for rust_decimal::Decimal {}
802
803 impl<V> GraphORMField for V
804 where
805 V: GraphORMValue,
806 Literal: TryFrom<V>,
807 {
808 fn orm_triples(
809 &self,
810 subject: Option<&Term>,
811 predicate: Option<&Term>,
812 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
813 let subject = Arc::new(subject.unwrap().clone());
814 let predicate = Arc::new(predicate.unwrap().clone());
815
816 Ok(Box::pin(async_stream::stream! {
817 let object = Arc::new(Term::Literal(self.clone().try_into().map_err(|e| GraphORMError::FailedSerialize("failed to convert into literal".to_string())).unwrap()));
819 yield Triple::new(subject.clone(), predicate.clone(), object).unwrap();
820 }))
821 }
822
823 async fn orm_field_from_graph(
824 graph: impl GraphView,
825 subject: &Term,
826 predicate: &Term,
827 object: Option<&Term>,
828 ) -> Result<Self, GraphORMError>
829 where
830 Self: Sized,
831 {
832 let matches_errfn = |e| {
833 GraphORMError::FailedDeserialize(format!(
834 "failed to query graph for subject {subject}"
835 ))
836 };
837 pin!(graph
838 .matches((Some(subject), Some(predicate), object), Some(1))
839 .await
840 .map_err(matches_errfn)?
841 .stream()
842 .await
843 .map_err(matches_errfn)?)
844 .next()
845 .await
846 .ok_or(GraphORMError::FailedDeserialize(format!(
847 "predicate {} not found for subject {}",
848 predicate, subject
849 )))?
850 .unwrap() .object()
852 .to_literal()
853 .map_err(|e| GraphORMError::FailedDeserialize("object is not a literal".to_string()))?
854 .clone()
855 .try_into()
856 .map_err(|e| {
857 GraphORMError::FailedDeserialize("failed to parse value into literal".to_string())
858 })
859 }
860 }
861}
862
863#[cfg(feature = "derive")]
864#[doc(inline)]
865pub use self::magic::*;