1use std::collections::HashSet;
4use std::convert::{TryFrom, TryInto};
5use std::fmt;
6use std::ops::Deref;
7use std::str::FromStr;
8
9use async_hash::{Digest, Hash, Output};
10use async_trait::async_trait;
11use destream::{de, en};
12use futures::{try_join, TryFutureExt};
13use get_size::GetSize;
14use get_size_derive::*;
15use log::debug;
16use safecast::{CastFrom, CastInto, Match, TryCastFrom, TryCastInto};
17
18use tc_error::*;
19use tc_transact::public::{Public, StateInstance, ToState};
20use tc_transact::Gateway;
21use tc_value::Value;
22use tcgeneric::*;
23
24use crate::{Link, Scalar, Scope, SELF};
25
26use super::{IdRef, OpDef, Refer, TCRef};
27
28const PREFIX: PathLabel = path_label(&["state", "scalar", "ref", "op"]);
29
30#[derive(Clone, Copy, Eq, PartialEq, Hash)]
32pub enum OpRefType {
33 Get,
34 Put,
35 Post,
36 Delete,
37}
38
39impl Class for OpRefType {}
40
41impl NativeClass for OpRefType {
42 fn from_path(path: &[PathSegment]) -> Option<Self> {
43 if path.len() == 5 && &path[..4] == &PREFIX[..] {
44 match path[4].as_str() {
45 "get" => Some(Self::Get),
46 "put" => Some(Self::Put),
47 "post" => Some(Self::Post),
48 "delete" => Some(Self::Delete),
49 _ => None,
50 }
51 } else {
52 None
53 }
54 }
55
56 fn path(&self) -> TCPathBuf {
57 let suffix = match self {
58 Self::Get => "get",
59 Self::Put => "put",
60 Self::Post => "post",
61 Self::Delete => "delete",
62 };
63
64 TCPathBuf::from(PREFIX).append(label(suffix))
65 }
66}
67
68impl fmt::Debug for OpRefType {
69 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
70 match self {
71 Self::Get => write!(f, "GET"),
72 Self::Put => write!(f, "PUT"),
73 Self::Post => write!(f, "POST"),
74 Self::Delete => write!(f, "DELETE"),
75 }
76 }
77}
78
79#[derive(Clone, Eq, PartialEq, GetSize)]
81pub enum Subject {
82 Link(Link),
83 Ref(IdRef, TCPathBuf),
84}
85
86impl Subject {
87 fn dereference_self(self, path: &TCPathBuf) -> Self {
88 match self {
89 Self::Ref(id_ref, suffix) if id_ref.id() == &SELF => {
90 let mut path = path.clone();
91 path.extend(suffix);
92 Self::Link(path.into())
93 }
94 other => other,
95 }
96 }
97
98 fn reference_self(self, path: &TCPathBuf) -> Self {
99 match self {
100 Self::Link(link) if link.path().starts_with(path) => Self::Ref(
101 IdRef::from(SELF),
102 link.into_path().into_iter().skip(path.len()).collect(),
103 ),
104 other => other,
105 }
106 }
107
108 fn requires(&self, deps: &mut HashSet<Id>) {
109 match self {
110 Self::Ref(id_ref, _) if id_ref.id() != &SELF => {
111 deps.insert(id_ref.id().clone());
112 }
113 _ => {}
114 }
115 }
116}
117
118impl<'a, D: Digest> Hash<D> for &'a Subject {
119 fn hash(self) -> Output<D> {
120 match self {
121 Subject::Link(link) => Hash::<D>::hash(link),
122 Subject::Ref(id, path) => Hash::<D>::hash((id, path)),
123 }
124 }
125}
126
127impl From<IdRef> for Subject {
128 fn from(id_ref: IdRef) -> Self {
129 Self::Ref(id_ref, TCPathBuf::default())
130 }
131}
132
133impl FromStr for Subject {
134 type Err = TCError;
135
136 fn from_str(s: &str) -> TCResult<Self> {
137 if s.starts_with('$') {
138 if let Some(i) = s.find('/') {
139 let id_ref = IdRef::from_str(&s[..i])?;
140 let path = TCPathBuf::from_str(&s[i..])?;
141 Ok(Self::Ref(id_ref, path))
142 } else {
143 let id_ref = IdRef::from_str(s)?;
144 Ok(Self::Ref(id_ref, TCPathBuf::default()))
145 }
146 } else {
147 Link::from_str(s).map(Self::Link).map_err(TCError::from)
148 }
149 }
150}
151
152impl From<Subject> for Scalar {
153 fn from(subject: Subject) -> Self {
154 match subject {
155 Subject::Ref(id_ref, path) => Scalar::Tuple(Tuple::from((id_ref.into(), path.into()))),
156 Subject::Link(link) => Scalar::Value(link.into()),
157 }
158 }
159}
160
161#[async_trait]
162impl de::FromStream for Subject {
163 type Context = ();
164
165 async fn from_stream<D: de::Decoder>(context: (), d: &mut D) -> Result<Self, D::Error> {
166 let subject = String::from_stream(context, d).await?;
167 Subject::from_str(&subject).map_err(de::Error::custom)
168 }
169}
170
171impl<'en> en::ToStream<'en> for Subject {
172 fn to_stream<E: en::Encoder<'en>>(&'en self, e: E) -> Result<E::Ok, E::Error> {
173 match self {
174 Self::Link(link) => link.to_stream(e),
175 Self::Ref(id_ref, path) if path.is_empty() => id_ref.to_stream(e),
176 Self::Ref(id_ref, path) => {
177 en::IntoStream::into_stream(format!("{}{}", id_ref, path), e)
178 }
179 }
180 }
181}
182
183impl<'en> en::IntoStream<'en> for Subject {
184 fn into_stream<E: en::Encoder<'en>>(self, e: E) -> Result<E::Ok, E::Error> {
185 match self {
186 Self::Link(link) => link.into_stream(e),
187 Self::Ref(id_ref, path) if path.is_empty() => id_ref.into_stream(e),
188 Self::Ref(id_ref, path) => format!("{}{}", id_ref, path).into_stream(e),
189 }
190 }
191}
192
193impl TryCastFrom<Value> for Subject {
194 fn can_cast_from(value: &Value) -> bool {
195 match value {
196 Value::Link(_) => true,
197 Value::String(s) => Self::from_str(s.as_str()).is_ok(),
198 Value::Tuple(tuple) => tuple.matches::<(IdRef, TCPathBuf)>(),
199 _ => false,
200 }
201 }
202
203 fn opt_cast_from(value: Value) -> Option<Self> {
204 match value {
205 Value::Link(link) => Some(Self::Link(link)),
206 Value::String(s) => Self::from_str(s.as_str()).ok(),
207 Value::Tuple(tuple) => tuple
208 .opt_cast_into()
209 .map(|(id, path)| Self::from((id, path))),
210 _ => None,
211 }
212 }
213}
214
215impl TryCastFrom<Scalar> for Subject {
216 fn can_cast_from(scalar: &Scalar) -> bool {
217 match scalar {
218 Scalar::Ref(tc_ref) => match &**tc_ref {
219 TCRef::Id(_) => true,
220 _ => false,
221 },
222 Scalar::Value(value) => Self::can_cast_from(value),
223 Scalar::Tuple(tuple) => tuple.matches::<(IdRef, TCPathBuf)>(),
224 _ => false,
225 }
226 }
227
228 fn opt_cast_from(scalar: Scalar) -> Option<Self> {
229 match scalar {
230 Scalar::Ref(tc_ref) => match *tc_ref {
231 TCRef::Id(id_ref) => Some(Self::Ref(id_ref, TCPathBuf::default())),
232 _ => None,
233 },
234 Scalar::Value(value) => Self::opt_cast_from(value),
235 Scalar::Tuple(tuple) => tuple
236 .opt_cast_into()
237 .map(|(id, path)| Self::from((id, path))),
238
239 _ => None,
240 }
241 }
242}
243
244impl From<TCPathBuf> for Subject {
245 fn from(path: TCPathBuf) -> Self {
246 Self::Link(path.into())
247 }
248}
249
250impl From<Link> for Subject {
251 fn from(link: Link) -> Self {
252 Self::Link(link)
253 }
254}
255
256impl From<(IdRef, TCPathBuf)> for Subject {
257 fn from(get: (IdRef, TCPathBuf)) -> Self {
258 Self::Ref(get.0, get.1)
259 }
260}
261
262impl TryFrom<Subject> for Link {
263 type Error = TCError;
264
265 fn try_from(subject: Subject) -> TCResult<Self> {
266 match subject {
267 Subject::Link(link) => Ok(link),
268 other => Err(TCError::unexpected(other, "a Link")),
269 }
270 }
271}
272
273impl TryFrom<Subject> for TCPathBuf {
274 type Error = TCError;
275
276 fn try_from(subject: Subject) -> TCResult<Self> {
277 match subject {
278 Subject::Link(link) if link.host().is_none() => Ok(link.into_path()),
279 other => Err(TCError::unexpected(other, "a Path")),
280 }
281 }
282}
283
284impl fmt::Debug for Subject {
285 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
286 match self {
287 Self::Link(link) => fmt::Debug::fmt(link, f),
288 Self::Ref(id_ref, path) if path.is_empty() => fmt::Debug::fmt(id_ref, f),
289 Self::Ref(id_ref, path) => write!(f, "subject: {:?} {:?}", id_ref, path),
290 }
291 }
292}
293
294impl fmt::Display for Subject {
295 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296 match self {
297 Self::Link(link) => fmt::Display::fmt(link, f),
298 Self::Ref(id_ref, path) if path.is_empty() => fmt::Display::fmt(id_ref, f),
299 Self::Ref(id_ref, path) => write!(f, "{}{}", id_ref, path),
300 }
301 }
302}
303
304pub type GetRef = (Subject, Scalar);
306
307pub type PutRef = (Subject, Scalar, Scalar);
309
310pub type PostRef = (Subject, Map<Scalar>);
312
313pub type DeleteRef = (Subject, Scalar);
315
316#[derive(Clone, Eq, PartialEq, GetSize)]
318pub enum OpRef {
319 Get(GetRef),
320 Put(PutRef),
321 Post(PostRef),
322 Delete(DeleteRef),
323}
324
325impl Instance for OpRef {
326 type Class = OpRefType;
327
328 fn class(&self) -> OpRefType {
329 use OpRefType as ORT;
330 match self {
331 Self::Get(_) => ORT::Get,
332 Self::Put(_) => ORT::Put,
333 Self::Post(_) => ORT::Post,
334 Self::Delete(_) => ORT::Delete,
335 }
336 }
337}
338
339impl OpRef {
340 pub(crate) fn dereference_subject(self, path: &TCPathBuf) -> Self {
341 match self {
342 Self::Get((subject, key)) => Self::Get((subject.dereference_self(path), key)),
343 Self::Put((subject, key, value)) => {
344 Self::Put((subject.dereference_self(path), key, value))
345 }
346 Self::Post((subject, params)) => Self::Post((subject.dereference_self(path), params)),
347 Self::Delete((subject, key)) => Self::Delete((subject.dereference_self(path), key)),
348 }
349 }
350}
351
352#[async_trait]
353impl<State> Refer<State> for OpRef
354where
355 State: StateInstance + Refer<State> + From<Scalar>,
356 State::Closure: From<(Map<State>, OpDef)> + TryCastFrom<State>,
357 Map<State>: TryFrom<State, Error = TCError>,
358 Value: TryFrom<State, Error = TCError> + TryCastFrom<State>,
359 bool: TryCastFrom<State>,
360{
361 fn dereference_self(self, path: &TCPathBuf) -> Self {
362 match self {
363 Self::Get((subject, key)) => Self::Get((subject.dereference_self(path), key)),
364 Self::Put((subject, key, value)) => Self::Put((
365 subject.dereference_self(path),
366 key,
367 value.dereference_self(path),
368 )),
369 Self::Post((subject, params)) => {
370 if let Scalar::Map(params) = Scalar::Map(params).dereference_self(path) {
371 Self::Post((subject.dereference_self(path), params))
372 } else {
373 panic!("Scalar::Map::dereference_self did not return a Scalar::Map")
374 }
375 }
376 Self::Delete((subject, key)) => Self::Delete((subject.dereference_self(path), key)),
377 }
378 }
379
380 fn is_conditional(&self) -> bool {
381 match self {
382 Self::Get((_, key)) => key.is_conditional(),
383 Self::Put((_, key, value)) => key.is_conditional() || value.is_conditional(),
384 Self::Post((_, params)) => params.values().any(|scalar| scalar.is_conditional()),
385 Self::Delete((_, key)) => key.is_conditional(),
386 }
387 }
388
389 fn is_inter_service_write(&self, cluster_path: &[PathSegment]) -> bool {
390 let subject = match self {
391 Self::Put((Subject::Link(link), _, _)) => Some(link),
392 Self::Delete((Subject::Link(link), _)) => Some(link),
393 _ => None,
394 };
395
396 if let Some(link) = subject {
397 !link.path().starts_with(cluster_path)
398 } else {
399 false
400 }
401 }
402
403 fn is_ref(&self) -> bool {
404 true
405 }
406
407 fn reference_self(self, path: &TCPathBuf) -> Self {
408 match self {
409 Self::Get((subject, key)) => Self::Get((subject.reference_self(path), key)),
410 Self::Put((subject, key, value)) => Self::Put((
411 subject.reference_self(path),
412 key,
413 value.reference_self(path),
414 )),
415 Self::Post((subject, params)) => {
416 let params = if let Scalar::Map(params) = Scalar::Map(params).reference_self(path) {
417 params
418 } else {
419 panic!("Scalar::Map::reference_self did not return a Scalar::Map")
420 };
421
422 Self::Post((subject.reference_self(path), params))
423 }
424 Self::Delete((subject, key)) => Self::Delete((subject.reference_self(path), key)),
425 }
426 }
427
428 fn requires(&self, deps: &mut HashSet<Id>) {
429 match self {
430 Self::Get((subject, key)) => {
431 subject.requires(deps);
432 key.requires(deps);
433 }
434 Self::Put((subject, key, value)) => {
435 subject.requires(deps);
436 key.requires(deps);
437 value.requires(deps);
438 }
439 Self::Post((subject, params)) => {
440 subject.requires(deps);
441 for param in params.values() {
442 param.requires(deps);
443 }
444 }
445 Self::Delete((subject, key)) => {
446 subject.requires(deps);
447 key.requires(deps);
448 }
449 }
450 }
451
452 async fn resolve<'a, T: ToState<State> + Public<State> + Instance>(
453 self,
454 context: &'a Scope<'a, State, T>,
455 txn: &'a State::Txn,
456 ) -> TCResult<State> {
457 debug!("OpRef::resolve {:?} from context {:?}", self, context);
458
459 #[inline]
460 fn invalid_key<'a, State: fmt::Debug, T>(
461 subject: &'a T,
462 ) -> impl FnOnce(&State) -> TCError + 'a
463 where
464 T: fmt::Debug + 'a,
465 {
466 move |v| bad_request!("{:?} is not a valid key for {:?}", v, subject)
467 }
468
469 match self {
470 Self::Get((subject, key)) => match subject {
471 Subject::Link(link) => {
472 let key = resolve(key, context, txn).await?;
473 let key = Value::try_cast_from(key, invalid_key(&link))?;
474 txn.get(link, key).await
475 }
476 Subject::Ref(id_ref, path) => {
477 let key = resolve(key, context, txn).await?;
478 let key = key.try_cast_into(invalid_key(&id_ref))?;
479 context.resolve_get(txn, id_ref.id(), &path, key).await
480 }
481 },
482 Self::Put((subject, key, value)) => match subject {
483 Subject::Link(link) => {
484 let key = resolve(key, context, txn).await?;
485 let key = Value::try_cast_from(key, invalid_key(&link))?;
486
487 let value = resolve(value, context, txn).await?;
488
489 txn.put(link, key, value)
490 .map_ok(|()| State::default())
491 .await
492 }
493 Subject::Ref(id_ref, path) => {
494 let key = resolve(key, context, txn);
495 let value = resolve(value, context, txn);
496 let (key, value) = try_join!(key, value)?;
497
498 let key = key.try_cast_into(invalid_key(&id_ref))?;
499
500 context
501 .resolve_put(txn, id_ref.id(), &path, key, value)
502 .map_ok(|()| State::default())
503 .await
504 }
505 },
506 Self::Post((subject, params)) => match subject {
507 Subject::Link(link) => {
508 let params = resolve(Scalar::Map(params), context, txn).await?;
509 let params = Map::<State>::try_from(params)?;
510 txn.post(link, params).await
511 }
512 Subject::Ref(id_ref, path) => {
513 let params = resolve(Scalar::Map(params), context, txn).await?;
514 let params = params.try_into()?;
515 context.resolve_post(txn, id_ref.id(), &path, params).await
516 }
517 },
518 Self::Delete((subject, key)) => match subject {
519 Subject::Link(link) => {
520 let key = resolve(key, context, txn).await?;
521 let key = Value::try_cast_from(key, invalid_key(&link))?;
522 txn.delete(link, key).map_ok(|()| State::default()).await
523 }
524 Subject::Ref(id_ref, path) => {
525 let key = resolve(key, context, txn).await?;
526 let key = key.try_cast_into(invalid_key(&id_ref))?;
527
528 context
529 .resolve_delete(txn, id_ref.id(), &path, key)
530 .map_ok(|()| State::default())
531 .await
532 }
533 },
534 }
535 }
536}
537
538impl<'a, D: Digest> Hash<D> for &'a OpRef {
539 fn hash(self) -> Output<D> {
540 match self {
541 OpRef::Get(get) => Hash::<D>::hash(get),
542 OpRef::Put(put) => Hash::<D>::hash(put),
543 OpRef::Post((subject, params)) => Hash::<D>::hash((subject, params.deref())),
544 OpRef::Delete(delete) => Hash::<D>::hash(delete),
545 }
546 }
547}
548
549impl CastFrom<OpRef> for Tuple<Scalar> {
550 fn cast_from(value: OpRef) -> Self {
551 match value {
552 OpRef::Get((subject, key)) => (subject.into(), key).into(),
553 OpRef::Put((subject, key, value)) => (subject.into(), key, value).into(),
554 OpRef::Post((subject, params)) => (subject.into(), params.into()).cast_into(),
555 OpRef::Delete((subject, key)) => (subject.into(), key).cast_into(),
556 }
557 }
558}
559
560pub struct OpRefVisitor;
561
562impl OpRefVisitor {
563 pub async fn visit_map_value<A: de::MapAccess>(
564 class: OpRefType,
565 access: &mut A,
566 ) -> Result<OpRef, A::Error> {
567 use OpRefType as ORT;
568
569 match class {
570 ORT::Get => access.next_value(()).map_ok(OpRef::Get).await,
571 ORT::Put => access.next_value(()).map_ok(OpRef::Put).await,
572 ORT::Post => access.next_value(()).map_ok(OpRef::Post).await,
573 ORT::Delete => access.next_value(()).map_ok(OpRef::Delete).await,
574 }
575 }
576
577 pub fn visit_ref_value<E: de::Error>(subject: Subject, params: Scalar) -> Result<OpRef, E> {
578 debug!("OpRefVisitor::visit_ref_value {} {:?}", subject, params);
579
580 match params {
581 Scalar::Map(params) => Ok(OpRef::Post((subject, params))),
582 Scalar::Tuple(mut tuple) if tuple.len() == 1 => {
583 let key = tuple.pop().unwrap();
584 Ok(OpRef::Get((subject, key)))
585 }
586 Scalar::Tuple(mut tuple) if tuple.len() == 2 => {
587 if let Subject::Link(link) = &subject {
588 if link.host().is_none() && link.path().len() >= 2 {
589 let is_chain = &link.path()[..2] == &["state", "chain"];
591 let is_collection = &link.path()[..2] == &["state", "collection"];
592
593 if is_chain || is_collection {
594 return Err(E::custom(format!(
595 "a scalar is immutable and cannot contain a mutable type {}: {:?} (consider using a closure)",
596 link, tuple
597 )));
598 }
599 }
600 }
601
602 let value = tuple.pop().unwrap();
603 let key = tuple.pop().unwrap();
604
605 if subject == Subject::Link(OpRefType::Delete.path().into()) {
606 let subject = key.try_cast_into(|k| {
607 E::invalid_type(format!("{k:?}"), "a Link or Id reference")
608 })?;
609
610 Ok(OpRef::Delete((subject, value)))
611 } else {
612 Ok(OpRef::Put((subject, key, value)))
613 }
614 }
615 other => {
616 debug!(
617 "invalid parameters for method call on {:?}: {:?}",
618 subject, other
619 );
620
621 Err(de::Error::invalid_value(
622 format!("{other:?}"),
623 "OpRef parameters",
624 ))
625 }
626 }
627 }
628}
629
630#[async_trait]
631impl de::Visitor for OpRefVisitor {
632 type Value = OpRef;
633
634 fn expecting() -> &'static str {
635 "an OpRef, e.g. {\"$subject\": [\"key\"]}"
636 }
637
638 async fn visit_map<A: de::MapAccess>(self, mut access: A) -> Result<Self::Value, A::Error> {
639 let subject = access
640 .next_key::<Subject>(())
641 .await?
642 .ok_or_else(|| de::Error::custom("expected OpRef, found empty map"))?;
643
644 if let Subject::Link(link) = &subject {
645 if link.host().is_none() {
646 if let Some(class) = OpRefType::from_path(link.path()) {
647 return Self::visit_map_value(class, &mut access).await;
648 }
649 }
650 }
651
652 let params = access.next_value(()).await?;
653 Self::visit_ref_value(subject, params)
654 }
655}
656
657#[async_trait]
658impl de::FromStream for OpRef {
659 type Context = ();
660
661 async fn from_stream<D: de::Decoder>(_: (), d: &mut D) -> Result<Self, D::Error> {
662 d.decode_map(OpRefVisitor).await
663 }
664}
665
666impl<'en> en::ToStream<'en> for OpRef {
667 fn to_stream<E: en::Encoder<'en>>(&'en self, e: E) -> Result<E::Ok, E::Error> {
668 use en::EncodeMap;
669
670 let mut map = e.encode_map(Some(1))?;
671
672 match self {
673 OpRef::Get((path, key)) => map.encode_entry(path.to_string(), (key,))?,
674 OpRef::Put((path, key, value)) => map.encode_entry(path.to_string(), (key, value))?,
675 OpRef::Post((path, data)) => map.encode_entry(path.to_string(), data.deref())?,
676 OpRef::Delete((path, key)) => {
677 map.encode_key(OpRefType::Delete.path().to_string())?;
678 map.encode_value((path, key))?
679 }
680 }
681
682 map.end()
683 }
684}
685
686impl<'en> en::IntoStream<'en> for OpRef {
687 fn into_stream<E: en::Encoder<'en>>(self, e: E) -> Result<E::Ok, E::Error> {
688 use en::EncodeMap;
689
690 let mut map = e.encode_map(Some(1))?;
691
692 match self {
693 OpRef::Get((path, key)) => map.encode_entry(path.to_string(), (key,))?,
694 OpRef::Put((path, key, value)) => map.encode_entry(path.to_string(), (key, value))?,
695 OpRef::Post((path, data)) => map.encode_entry(path.to_string(), data.into_inner())?,
696 OpRef::Delete((path, key)) => {
697 map.encode_key(OpRefType::Delete.path().to_string())?;
698 map.encode_value((path, key))?
699 }
700 }
701
702 map.end()
703 }
704}
705
706impl fmt::Debug for OpRef {
707 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
708 let class = self.class();
709
710 match self {
711 OpRef::Get((link, key)) => write!(f, "{:?} {}?key={:?}", class, link, key),
712 OpRef::Put((path, key, value)) => {
713 write!(f, "{:?} {}?key={:?} <- {:?}", class, path, key, value)
714 }
715 OpRef::Post((path, params)) => write!(f, "{:?} {}({:?})", class, path, params),
716 OpRef::Delete((link, key)) => write!(f, "{:?} {}?key={:?}", class, link, key),
717 }
718 }
719}
720
721async fn resolve<'a, State, T, S>(
722 tc_ref: T,
723 context: &'a Scope<'a, State, S>,
724 txn: &'a State::Txn,
725) -> TCResult<State>
726where
727 State: StateInstance + Refer<State>,
728 T: Refer<State>,
729 S: ToState<State> + Public<State> + Instance,
730{
731 let mut state = tc_ref.resolve(context, txn).await?;
732 while state.is_ref() {
733 state = state.resolve(context, txn).await?;
734 }
735
736 Ok(state)
737}