1use taganak_core::{default_graph_view, prelude::*};
19
20use futures::{Stream, StreamExt, TryFutureExt};
21
22use taganak_core::graphs::{GraphError, GraphView};
23use taganak_core::terms::{BlankNode, Literal};
24
25use std::collections::{HashMap, HashSet};
26use std::pin::pin;
27
28use thiserror::Error;
29
30#[derive(Debug, Error, PartialEq)]
32pub enum GraphORMError {
33 #[error("Failed to deserialize from graph: {0}")]
34 FailedDeserialize(String),
35 #[error("Failed to serialize to graph: {0}")]
36 FailedSerialize(String),
37 #[error("Failed to generate schema: {0}")]
38 FailedSchema(String),
39}
40
41#[derive(Default, Debug, Clone, PartialEq)]
46pub enum SubjectMode {
47 #[default]
50 Generated,
51 Static(Arc<Term>),
54}
55
56#[derive(Debug, Clone, PartialEq)]
63pub struct GraphORMMeta<T>
64where
65 T: std::fmt::Debug + PartialEq,
66{
67 subject_mode: SubjectMode,
68 thing: T,
69}
70
71impl<T> core::ops::Deref for GraphORMMeta<T>
72where
73 T: std::fmt::Debug + PartialEq,
74{
75 type Target = T;
76
77 fn deref(&self) -> &Self::Target {
78 &self.thing
79 }
80}
81
82impl<T> core::ops::DerefMut for GraphORMMeta<T>
83where
84 T: std::fmt::Debug + PartialEq,
85{
86 fn deref_mut(&mut self) -> &mut Self::Target {
87 &mut self.thing
88 }
89}
90
91impl<T> GraphORMMeta<T>
92where
93 T: GraphORM + Unpin + std::fmt::Debug + PartialEq + Sync + Send,
94{
95 pub fn new(thing: T) -> Self {
99 Self {
100 subject_mode: SubjectMode::default(),
101 thing,
102 }
103 }
104
105 pub fn into_deref(self) -> T {
113 self.thing
114 }
115
116 pub fn with_subject(mut self, subject: Arc<Term>) -> Self {
120 self.subject_mode = SubjectMode::Static(subject);
121 self
122 }
123
124 pub fn fixate_subject(&mut self) -> Result<(), GraphORMError> {
132 match self.subject_mode {
133 SubjectMode::Generated => {
134 self.subject_mode = SubjectMode::Static(self.thing.rdf_subject()?);
135 }
136 SubjectMode::Static(_) => {}
137 };
138
139 Ok(())
140 }
141
142 pub fn subject(&self) -> Result<Arc<Term>, GraphORMError> {
143 match &self.subject_mode {
144 SubjectMode::Generated => self.thing.rdf_subject(),
145 SubjectMode::Static(s) => Ok(s.clone()),
146 }
147 }
148
149 pub fn serialize_stream(
154 &self,
155 ) -> Result<impl Stream<Item = Arc<Triple>> + Unpin + Send + use<'_, T>, GraphORMError> {
156 self.thing.serialize_stream(self.subject()?)
157 }
158
159 pub async fn serialize(&self, graph: &mut impl Graph) -> Result<(), GraphORMError> {
164 graph
165 .merge_stream(
166 self.stream()
167 .await
168 .map_err(|e| todo!("find matching error kind"))?,
169 )
170 .await
171 .map_err(|e| todo!("find matching error kind"))?;
172 Ok(())
173 }
174}
175
176impl<T> Graph for GraphORMMeta<T>
177where
178 T: GraphORM + Unpin + std::fmt::Debug + PartialEq + Sync + Send,
179{
180 type View<'g>
181 = &'g Self
182 where
183 Self: 'g;
184
185 async fn view(&self) -> Self::View<'_> {
186 self
187 }
188}
189
190impl<'a, T> GraphView for &'a GraphORMMeta<T>
191where
192 T: GraphORM + Unpin + std::fmt::Debug + PartialEq + Sync + Send + 'a,
193{
194 type Stream =
195 core::pin::Pin<Box<dyn Stream<Item = Result<Arc<Triple>, GraphError>> + Unpin + Send + 'a>>;
196 async fn stream(self) -> Result<Self::Stream, GraphError> {
197 Ok(Box::pin(
198 self.serialize_stream()
199 .map_err(|e| todo!("find matching error kind") as GraphError)?
200 .map(|t| Ok(t)),
201 ))
202 }
203 default_graph_view!();
204}
205
206pub trait GraphORM {
207 fn rdf_subject(&self) -> Result<Arc<Term>, GraphORMError> {
216 Ok(Arc::new(Term::BlankNode(BlankNode::new())))
217 }
218
219 fn rdf_type() -> Arc<Term> {
223 todo!("default impl -> rdfs:Resource")
224 }
225
226 async fn serialize_schema(graph: &mut impl Graph) -> Result<(), GraphORMError>;
236
237 fn serialize_stream(
239 &self,
240 subject: Arc<Term>,
241 ) -> Result<impl Stream<Item = Arc<Triple>> + Unpin + Send, GraphORMError>;
242
243 async fn deserialize(
245 graph: impl GraphView,
246 subject: &Term,
247 ) -> Result<GraphORMMeta<Self>, GraphORMError>
248 where
249 Self: Sized + std::fmt::Debug + PartialEq;
250
251 fn into_meta(self) -> GraphORMMeta<Self>
253 where
254 Self: Sized + Unpin + std::fmt::Debug + PartialEq + Sync + Send,
255 {
256 GraphORMMeta::new(self)
257 }
258}
259
260#[cfg(feature = "derive")]
261mod magic {
262 use async_stream::stream;
263 use futures::stream;
264 use std::collections::{BTreeMap, BTreeSet};
265
266 #[doc(inline)]
267 pub use taganak_orm_derive::*;
268
269 pub mod re {
270 pub use async_stream;
272 pub use futures;
273 pub use taganak_core::prelude::*;
274 pub use tracing::trace;
275 }
276 use super::*;
277
278 pub trait GraphORMField {
293 fn orm_triples(
295 &self,
296 subject: Option<&Term>,
297 predicate: Option<&Term>,
298 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError>;
299
300 async fn orm_field_from_graph(
306 graph: impl GraphView,
307 subject: &Term,
308 predicate: &Term,
309 object: Option<&Term>,
310 ) -> Result<Self, GraphORMError>
311 where
312 Self: Sized;
313 }
314
315 impl<F> GraphORMField for GraphORMMeta<F>
316 where
317 F: GraphORMField + GraphORM + core::fmt::Debug + Unpin + Eq + Sync + Send,
318 {
319 fn orm_triples(
320 &self,
321 subject: Option<&Term>,
322 predicate: Option<&Term>,
323 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
324 Ok(Box::pin(stream! {
325 let subject = self.subject().unwrap();
326 let mut stream = pin!((**self).orm_triples(Some(subject.as_ref()), predicate).unwrap());
327 while let Some(triple) = stream.next().await {
328 yield triple;
329 }
330 }))
331 }
332
333 async fn orm_field_from_graph(
334 graph: impl GraphView,
335 subject: &Term,
336 predicate: &Term,
337 object: Option<&Term>,
338 ) -> Result<Self, GraphORMError>
339 where
340 Self: Sized,
341 {
342 let mut meta = GraphORMMeta::new(
343 F::orm_field_from_graph(graph.clone(), subject, predicate, object).await?,
344 );
345
346 let matches_errfn = |e| {
347 GraphORMError::FailedDeserialize(format!(
348 "failed to query graph for subject {subject}"
349 ))
350 };
351 if let Some(Ok(t)) = pin!(graph
352 .matches((Some(subject), Some(predicate), object), None)
353 .await
354 .map_err(matches_errfn)?
355 .stream()
356 .await
357 .map_err(matches_errfn)?)
358 .next()
359 .await
360 {
361 meta = meta.with_subject(t.object_arc().clone());
362 }
363
364 Ok(meta)
365 }
366 }
367
368 impl<F> GraphORMField for Option<F>
369 where
370 F: GraphORMField,
371 {
372 fn orm_triples(
373 &self,
374 subject: Option<&Term>,
375 predicate: Option<&Term>,
376 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
377 Ok(Box::pin(stream! {
378 if let Some(f) = self {
379 let mut stream = pin!(f.orm_triples(subject, predicate).unwrap());
380 while let Some(triple) = stream.next().await {
381 yield triple;
382 }
383 }
384 }))
385 }
386
387 async fn orm_field_from_graph(
388 graph: impl GraphView,
389 subject: &Term,
390 predicate: &Term,
391 object: Option<&Term>,
392 ) -> Result<Self, GraphORMError>
393 where
394 Self: Sized,
395 {
396 let matches_errfn = |e| {
397 GraphORMError::FailedDeserialize(format!(
398 "failed to query graph for subject {subject}"
399 ))
400 };
401 if pin!(graph
402 .clone()
403 .matches((Some(subject), Some(predicate), object), None)
404 .await
405 .map_err(matches_errfn)?
406 .stream()
407 .await
408 .map_err(matches_errfn)?)
409 .next()
410 .await
411 .is_some()
412 {
413 Ok(Some(
414 F::orm_field_from_graph(graph, subject, predicate, object).await?,
415 ))
416 } else {
417 Ok(None)
418 }
419 }
420 }
421
422 impl<F> GraphORMField for Arc<F>
423 where
424 F: GraphORMField,
425 {
426 fn orm_triples(
427 &self,
428 subject: Option<&Term>,
429 predicate: Option<&Term>,
430 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
431 Ok(Box::pin(stream! {
432 let mut stream = pin!((**self).orm_triples(subject, predicate).unwrap());
433 while let Some(triple) = stream.next().await {
434 yield triple;
435 }
436 }))
437 }
438
439 async fn orm_field_from_graph(
440 graph: impl GraphView,
441 subject: &Term,
442 predicate: &Term,
443 object: Option<&Term>,
444 ) -> Result<Self, GraphORMError>
445 where
446 Self: Sized,
447 {
448 Ok(Arc::new(
449 F::orm_field_from_graph(graph, subject, predicate, object).await?,
450 ))
451 }
452 }
453
454 impl<F> GraphORMField for HashSet<F>
455 where
456 F: GraphORMField + Eq + std::hash::Hash,
457 {
458 fn orm_triples(
459 &self,
460 subject: Option<&Term>,
461 predicate: Option<&Term>,
462 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
463 Ok(Box::pin(async_stream::stream! {
464 for v in self {
466 let mut stream = pin!(v.orm_triples(subject, predicate).unwrap());
467 while let Some(triple) = stream.next().await {
468 yield triple;
469 }
470 }
471 }))
472 }
473
474 async fn orm_field_from_graph(
475 graph: impl GraphView,
476 subject: &Term,
477 predicate: &Term,
478 object: Option<&Term>,
479 ) -> Result<Self, GraphORMError>
480 where
481 Self: Sized,
482 {
483 let mut set = HashSet::new();
484 let matches_errfn = |e| {
485 GraphORMError::FailedDeserialize(format!(
486 "failed to query graph for subject {subject}"
487 ))
488 };
489 let mut triples = pin!(graph
490 .clone()
491 .matches((Some(subject), Some(predicate), object), None)
492 .await
493 .map_err(matches_errfn)?
494 .stream()
495 .await
496 .map_err(matches_errfn)?);
497
498 while let Some(triple) = triples.next().await {
499 let triple = triple.expect("handle error streams, actually");
501 set.insert(
502 F::orm_field_from_graph(
503 graph.clone(),
504 subject,
505 predicate,
506 Some(triple.object()),
507 )
508 .await?,
509 );
510 }
511
512 Ok(set)
513 }
514 }
515
516 impl<F> GraphORMField for BTreeSet<F>
517 where
518 F: Ord + GraphORMField,
519 {
520 fn orm_triples(
521 &self,
522 subject: Option<&Term>,
523 predicate: Option<&Term>,
524 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
525 Ok(Box::pin(async_stream::stream! {
526 for v in self {
528 let mut stream = pin!(v.orm_triples(subject, predicate).unwrap());
529 while let Some(triple) = stream.next().await {
530 yield triple;
531 }
532 }
533 }))
534 }
535
536 async fn orm_field_from_graph(
537 graph: impl GraphView,
538 subject: &Term,
539 predicate: &Term,
540 object: Option<&Term>,
541 ) -> Result<Self, GraphORMError>
542 where
543 Self: Sized,
544 {
545 let matches_errfn = |e| {
546 GraphORMError::FailedDeserialize(format!(
547 "failed to query graph for subject {subject}"
548 ))
549 };
550 let mut set = BTreeSet::new();
551 let mut triples = pin!(graph
552 .clone()
553 .matches((Some(subject), Some(predicate), object), None)
554 .await
555 .map_err(matches_errfn)?
556 .stream()
557 .await
558 .map_err(matches_errfn)?);
559
560 while let Some(triple) = triples.next().await {
561 let triple = triple.expect("handle error streams suitably");
563 set.insert(
564 F::orm_field_from_graph(
565 graph.clone(),
566 subject,
567 predicate,
568 Some(triple.object()),
569 )
570 .await?,
571 );
572 }
573
574 Ok(set)
575 }
576 }
577
578 impl GraphORMField for Term {
579 fn orm_triples(
580 &self,
581 subject: Option<&Term>,
582 predicate: Option<&Term>,
583 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
584 Ok(Box::pin(stream! {
585 yield Triple::new(Arc::new(subject.unwrap().clone()), Arc::new(predicate.unwrap().clone()), Arc::new(self.clone())).unwrap();
586 }))
587 }
588
589 async fn orm_field_from_graph(
590 graph: impl GraphView,
591 subject: &Term,
592 predicate: &Term,
593 object: Option<&Term>,
594 ) -> Result<Self, GraphORMError>
595 where
596 Self: Sized,
597 {
598 let matches_errfn = |e| {
599 GraphORMError::FailedDeserialize(format!(
600 "failed to query graph for subject {subject}"
601 ))
602 };
603 Ok((*graph
604 .object(Some(subject), Some(predicate))
605 .await
606 .map_err(matches_errfn)?
607 .ok_or(GraphORMError::FailedDeserialize(format!(
608 "failed to query graph for subject {subject}"
609 )))?)
610 .clone())
611 }
612 }
613
614 impl<F> GraphORMField for Vec<F>
615 where
616 F: GraphORMField,
617 {
618 fn orm_triples(
619 &self,
620 subject: Option<&Term>,
621 predicate: Option<&Term>,
622 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
623 Ok(Box::pin(async_stream::stream! {
624 for v in self {
626 let mut stream = pin!(v.orm_triples(subject, predicate).unwrap());
627 while let Some(triple) = stream.next().await {
628 yield triple;
630 }
631 }
632 }))
633 }
634
635 async fn orm_field_from_graph(
636 graph: impl GraphView,
637 subject: &Term,
638 predicate: &Term,
639 object: Option<&Term>,
640 ) -> Result<Self, GraphORMError>
641 where
642 Self: Sized,
643 {
644 let mut items = Vec::new();
645
646 let matches_errfn = |e| {
647 GraphORMError::FailedDeserialize(format!(
648 "failed to query graph for subject {subject}"
649 ))
650 };
651
652 let rdf_first = Term::NamedNode(
653 Iri::new("http://www.w3.org/1999/02/22-rdf-syntax-ns#first".to_string()).unwrap(),
654 );
655 let rdf_rest = Term::NamedNode(
656 Iri::new("http://www.w3.org/1999/02/22-rdf-syntax-ns#rest".to_string()).unwrap(),
657 );
658 let rdf_nil = Term::NamedNode(
659 Iri::new("http://www.w3.org/1999/02/22-rdf-syntax-ns#nil".to_string()).unwrap(),
660 );
661
662 let mut current = (*graph
663 .clone()
664 .object(Some(subject), Some(predicate))
665 .await
666 .map_err(matches_errfn)?
667 .ok_or(GraphORMError::FailedDeserialize(format!(
668 "failed to query graph for subject {subject}"
669 )))?)
670 .clone();
671 let mut rest;
672 loop {
673 items.push(
674 F::orm_field_from_graph(graph.clone(), ¤t, &rdf_first, None).await?,
675 );
676
677 rest = graph
678 .clone()
679 .object(Some(¤t), Some(&rdf_rest))
680 .await
681 .map_err(matches_errfn)?
682 .ok_or(GraphORMError::FailedDeserialize(
683 "not a valid RDF collection".to_string(),
684 ))?;
685 if *rest == rdf_nil {
686 break;
687 } else {
688 current = (*rest).clone();
689 }
690 }
691
692 Ok(items)
693 }
694 }
695
696 impl<F> GraphORMField for HashMap<Arc<Term>, F>
697 where
698 F: GraphORMField + Eq + std::hash::Hash,
699 {
700 fn orm_triples(
701 &self,
702 subject: Option<&Term>,
703 predicate: Option<&Term>,
704 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
705 Ok(Box::pin(async_stream::stream! {
706 for v in self.values() {
708 let mut stream = pin!(v.orm_triples(subject, predicate).unwrap());
709 while let Some(triple) = stream.next().await {
710 yield triple;
711 }
712 }
713 }))
714 }
715
716 async fn orm_field_from_graph(
717 graph: impl GraphView,
718 subject: &Term,
719 predicate: &Term,
720 object: Option<&Term>,
721 ) -> Result<Self, GraphORMError>
722 where
723 Self: Sized,
724 {
725 let matches_errfn = |e| {
726 GraphORMError::FailedDeserialize(format!(
727 "failed to query graph for subject {subject}"
728 ))
729 };
730 let mut map = HashMap::new();
731 let mut triples = pin!(graph
732 .clone()
733 .matches((Some(subject), Some(predicate), object), None)
734 .await
735 .map_err(matches_errfn)?
736 .stream()
737 .await
738 .map_err(matches_errfn)?);
739
740 while let Some(triple) = triples.next().await {
741 let triple = triple.expect("handle error streams appropriately");
743 map.insert(
744 Arc::new(triple.object().clone()),
745 F::orm_field_from_graph(
746 graph.clone(),
747 subject,
748 predicate,
749 Some(triple.object()),
750 )
751 .await?,
752 );
753 }
754
755 Ok(map)
756 }
757 }
758 #[inline(always)]
763 pub async fn get_graph_orm_field<F: GraphORMField>(
764 graph: impl GraphView,
765 subject: &Term,
766 predicate: &Term,
767 object: Option<&Term>,
768 ) -> Result<F, GraphORMError> {
769 F::orm_field_from_graph(graph, subject, predicate, object).await
770 }
771
772 pub trait GraphORMValue: TryFrom<Literal> + Clone
773 where
774 Literal: for<'a> TryFrom<Self>,
775 {
776 }
777
778 impl GraphORMValue for u8 {}
779 impl GraphORMValue for u16 {}
780 impl GraphORMValue for u32 {}
781 impl GraphORMValue for u64 {}
782 impl GraphORMValue for u128 {}
783 impl GraphORMValue for i8 {}
784 impl GraphORMValue for i16 {}
785 impl GraphORMValue for i32 {}
786 impl GraphORMValue for i64 {}
787 impl GraphORMValue for f32 {}
788 impl GraphORMValue for f64 {}
789 impl GraphORMValue for i128 {}
790 impl GraphORMValue for String {}
791 impl GraphORMValue for Iri {}
792 impl GraphORMValue for bool {}
793 #[cfg(feature = "chrono")]
794 impl GraphORMValue for chrono::NaiveDate {}
795 #[cfg(feature = "chrono")]
796 impl GraphORMValue for chrono::NaiveTime {}
797 #[cfg(feature = "chrono")]
798 impl GraphORMValue for chrono::NaiveDateTime {}
799 #[cfg(feature = "decimal")]
800 impl GraphORMValue for rust_decimal::Decimal {}
801
802 impl<V> GraphORMField for V
803 where
804 V: GraphORMValue,
805 Literal: TryFrom<V>,
806 {
807 fn orm_triples(
808 &self,
809 subject: Option<&Term>,
810 predicate: Option<&Term>,
811 ) -> Result<impl Stream<Item = Arc<Triple>>, GraphORMError> {
812 let subject = Arc::new(subject.unwrap().clone());
813 let predicate = Arc::new(predicate.unwrap().clone());
814
815 Ok(Box::pin(async_stream::stream! {
816 let object = Arc::new(Term::Literal(self.clone().try_into().map_err(|e| GraphORMError::FailedSerialize("failed to convert into literal".to_string())).unwrap()));
818 yield Triple::new(subject.clone(), predicate.clone(), object).unwrap();
819 }))
820 }
821
822 async fn orm_field_from_graph(
823 graph: impl GraphView,
824 subject: &Term,
825 predicate: &Term,
826 object: Option<&Term>,
827 ) -> Result<Self, GraphORMError>
828 where
829 Self: Sized,
830 {
831 let matches_errfn = |e| {
832 GraphORMError::FailedDeserialize(format!(
833 "failed to query graph for subject {subject}"
834 ))
835 };
836 pin!(graph
837 .matches((Some(subject), Some(predicate), object), Some(1))
838 .await
839 .map_err(matches_errfn)?
840 .stream()
841 .await
842 .map_err(matches_errfn)?)
843 .next()
844 .await
845 .ok_or(GraphORMError::FailedDeserialize(format!(
846 "predicate {} not found for subject {}",
847 predicate, subject
848 )))?
849 .unwrap() .object()
851 .to_literal()
852 .map_err(|e| GraphORMError::FailedDeserialize("object is not a literal".to_string()))?
853 .clone()
854 .try_into()
855 .map_err(|e| {
856 GraphORMError::FailedDeserialize("failed to parse value into literal".to_string())
857 })
858 }
859 }
860}
861
862#[cfg(feature = "derive")]
863#[doc(inline)]
864pub use self::magic::*;