1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4};
5
6use async_stream::stream;
7use futures::{StreamExt, pin_mut};
8use tonic::codegen::{Body, Bytes, StdError};
9
10use crate::{
11 client::{
12 BatchCheckItem, BatchCheckRequest, CheckRequest, CheckRequestTupleKey,
13 ConsistencyPreference, ContextualTupleKeys, ExpandRequest, ExpandRequestTupleKey,
14 ListObjectsRequest, ListObjectsResponse, OpenFgaServiceClient, ReadRequest,
15 ReadRequestTupleKey, ReadResponse, Tuple, TupleKey, TupleKeyWithoutCondition, UsersetTree,
16 WriteRequest, WriteRequestDeletes, WriteRequestWrites,
17 batch_check_single_result::CheckResult,
18 },
19 error::{Error, Result},
20};
21
22const DEFAULT_MAX_TUPLES_PER_WRITE: i32 = 100;
23
24#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
25pub enum ConflictBehavior {
30 #[default]
32 Fail,
33 Ignore,
35}
36
37impl ConflictBehavior {
38 fn as_str(&self) -> &str {
39 match self {
40 ConflictBehavior::Fail => "",
41 ConflictBehavior::Ignore => "ignore",
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct WriteOptions {
53 pub on_duplicate: ConflictBehavior,
55 pub on_missing: ConflictBehavior,
57}
58
59impl WriteOptions {
60 #[must_use]
61 pub fn new_idempotent() -> Self {
62 Self {
63 on_duplicate: ConflictBehavior::Ignore,
64 on_missing: ConflictBehavior::Ignore,
65 }
66 }
67}
68
69impl Default for WriteOptions {
70 fn default() -> Self {
71 Self {
72 on_duplicate: ConflictBehavior::Fail,
73 on_missing: ConflictBehavior::Fail,
74 }
75 }
76}
77
78#[derive(Clone, Debug)]
79pub struct OpenFgaClient<T> {
109 client: OpenFgaServiceClient<T>,
110 inner: Arc<ModelClientInner>,
111}
112
113#[derive(Debug, Clone)]
114struct ModelClientInner {
115 store_id: String,
116 authorization_model_id: String,
117 max_tuples_per_write: i32,
118 consistency: ConsistencyPreference,
119}
120
121#[cfg(feature = "auth-middle")]
122pub type BasicOpenFgaClient = OpenFgaClient<crate::client::BasicAuthLayer>;
127
128impl<T> OpenFgaClient<T>
129where
130 T: tonic::client::GrpcService<tonic::body::Body>,
131 T::Error: Into<StdError>,
132 T::ResponseBody: Body<Data = Bytes> + Send + 'static,
133 <T::ResponseBody as Body>::Error: Into<StdError> + Send,
134 T: Clone,
135{
136 #[must_use]
138 pub fn new(
139 client: OpenFgaServiceClient<T>,
140 store_id: &str,
141 authorization_model_id: &str,
142 ) -> Self {
143 OpenFgaClient {
144 client,
145 inner: Arc::new(ModelClientInner {
146 store_id: store_id.to_string(),
147 authorization_model_id: authorization_model_id.to_string(),
148 max_tuples_per_write: DEFAULT_MAX_TUPLES_PER_WRITE,
149 consistency: ConsistencyPreference::MinimizeLatency,
150 }),
151 }
152 }
153
154 #[must_use]
156 pub fn set_max_tuples_per_write(mut self, max_tuples_per_write: i32) -> Self {
157 let inner = Arc::unwrap_or_clone(self.inner);
158 self.inner = Arc::new(ModelClientInner {
159 store_id: inner.store_id,
160 authorization_model_id: inner.authorization_model_id,
161 max_tuples_per_write,
162 consistency: inner.consistency,
163 });
164 self
165 }
166
167 #[must_use]
169 pub fn set_consistency(mut self, consistency: impl Into<ConsistencyPreference>) -> Self {
170 let inner = Arc::unwrap_or_clone(self.inner);
171 self.inner = Arc::new(ModelClientInner {
172 store_id: inner.store_id,
173 authorization_model_id: inner.authorization_model_id,
174 max_tuples_per_write: inner.max_tuples_per_write,
175 consistency: consistency.into(),
176 });
177 self
178 }
179
180 pub fn store_id(&self) -> &str {
182 &self.inner.store_id
183 }
184
185 pub fn authorization_model_id(&self) -> &str {
187 &self.inner.authorization_model_id
188 }
189
190 pub fn max_tuples_per_write(&self) -> i32 {
192 self.inner.max_tuples_per_write
193 }
194
195 pub fn client(&self) -> OpenFgaServiceClient<T> {
197 self.client.clone()
198 }
199
200 pub fn consistency(&self) -> ConsistencyPreference {
202 self.inner.consistency
203 }
204
205 pub async fn write(
227 &self,
228 writes: impl Into<Option<Vec<TupleKey>>>,
229 deletes: impl Into<Option<Vec<TupleKeyWithoutCondition>>>,
230 ) -> Result<()> {
231 self.write_with_options(writes, deletes, WriteOptions::default())
232 .await
233 }
234
235 pub async fn write_with_options(
280 &self,
281 writes: impl Into<Option<Vec<TupleKey>>>,
282 deletes: impl Into<Option<Vec<TupleKeyWithoutCondition>>>,
283 options: WriteOptions,
284 ) -> Result<()> {
285 let writes = writes.into().and_then(|w| (!w.is_empty()).then_some(w));
286 let deletes = deletes.into().and_then(|d| (!d.is_empty()).then_some(d));
287
288 if writes.is_none() && deletes.is_none() {
289 return Ok(());
290 }
291
292 let num_writes_and_deletes = i32::try_from(
293 #[allow(clippy::manual_saturating_arithmetic)]
294 writes
295 .as_ref()
296 .map_or(0, Vec::len)
297 .checked_add(deletes.as_ref().map_or(0, Vec::len))
298 .unwrap_or(usize::MAX),
299 )
300 .unwrap_or(i32::MAX);
301
302 if num_writes_and_deletes > self.max_tuples_per_write() {
303 tracing::error!(
304 "Too many writes and deletes in single OpenFGA transaction (actual) {} > {} (max)",
305 num_writes_and_deletes,
306 self.max_tuples_per_write()
307 );
308 return Err(Error::TooManyWrites {
309 actual: num_writes_and_deletes,
310 max: self.max_tuples_per_write(),
311 });
312 }
313
314 let write_request = WriteRequest {
315 store_id: self.store_id().to_string(),
316 writes: writes.map(|writes| WriteRequestWrites {
317 tuple_keys: writes,
318 on_duplicate: options.on_duplicate.as_str().to_string(),
319 }),
320 deletes: deletes.map(|deletes| WriteRequestDeletes {
321 on_missing: options.on_missing.as_str().to_string(),
322 tuple_keys: deletes,
323 }),
324 authorization_model_id: self.authorization_model_id().to_string(),
325 };
326
327 self.client
328 .clone()
329 .write(write_request.clone())
330 .await
331 .map_err(|e| {
332 let write_request_debug = format!("{write_request:?}");
333 tracing::error!(
334 "Write request failed with status {e}. Request: {write_request_debug}"
335 );
336 Error::RequestFailed(Box::new(e))
337 })
338 .map(|_| ())
339 }
340
341 pub async fn read(
392 &self,
393 page_size: i32,
394 tuple_key: impl Into<Option<ReadRequestTupleKey>>,
395 continuation_token: impl Into<Option<String>>,
396 ) -> Result<tonic::Response<ReadResponse>> {
397 let read_request = ReadRequest {
398 store_id: self.store_id().to_string(),
399 page_size: Some(page_size),
400 continuation_token: continuation_token.into().unwrap_or_default(),
401 tuple_key: tuple_key.into(),
402 consistency: self.consistency().into(),
403 };
404 self.client
405 .clone()
406 .read(read_request.clone())
407 .await
408 .map_err(|e| {
409 let read_request_debug = format!("{read_request:?}");
410 tracing::error!(
411 "Read request failed with status {e}. Request: {read_request_debug}"
412 );
413 Error::RequestFailed(Box::new(e))
414 })
415 }
416
417 pub async fn read_all_pages(
435 &self,
436 tuple: Option<impl Into<ReadRequestTupleKey>>,
437 page_size: i32,
438 max_pages: u32,
439 ) -> Result<Vec<Tuple>> {
440 let store_id = self.store_id().to_string();
441 self.client
442 .clone()
443 .read_all_pages(&store_id, tuple, self.consistency(), page_size, max_pages)
444 .await
445 }
446
447 pub async fn check(
454 &self,
455 tuple_key: impl Into<CheckRequestTupleKey>,
456 contextual_tuples: impl Into<Option<Vec<TupleKey>>>,
457 context: impl Into<Option<prost_wkt_types::Struct>>,
458 trace: bool,
459 ) -> Result<bool> {
460 let contextual_tuples = contextual_tuples
461 .into()
462 .and_then(|c| (!c.is_empty()).then_some(c))
463 .map(|tuple_keys| ContextualTupleKeys { tuple_keys });
464
465 let check_request = CheckRequest {
466 store_id: self.store_id().to_string(),
467 tuple_key: Some(tuple_key.into()),
468 consistency: self.consistency().into(),
469 contextual_tuples,
470 authorization_model_id: self.authorization_model_id().to_string(),
471 context: context.into(),
472 trace,
473 };
474 let response = self
475 .client
476 .clone()
477 .check(check_request.clone())
478 .await
479 .map_err(|e| {
480 let check_request_debug = format!("{check_request:?}");
481 tracing::error!(
482 "Check request failed with status {e}. Request: {check_request_debug}"
483 );
484 Error::RequestFailed(Box::new(e))
485 })?;
486 Ok(response.get_ref().allowed)
487 }
488
489 pub async fn batch_check<I>(
497 &self,
498 checks: impl IntoIterator<Item = I>,
499 ) -> Result<HashMap<String, CheckResult>>
500 where
501 I: Into<BatchCheckItem>,
502 {
503 let checks: Vec<BatchCheckItem> = checks.into_iter().map(Into::into).collect();
504 let request = BatchCheckRequest {
505 store_id: self.store_id().to_string(),
506 checks,
507 authorization_model_id: self.authorization_model_id().to_string(),
508 consistency: self.consistency().into(),
509 };
510
511 let response = self
512 .client
513 .clone()
514 .batch_check(request.clone())
515 .await
516 .map_err(|e| {
517 let request_debug = format!("{request:?}");
518 tracing::error!(
519 "Batch-Check request failed with status {e}. Request: {request_debug}"
520 );
521 Error::RequestFailed(Box::new(e))
522 })?;
523
524 let mut map = HashMap::new();
525 for (k, v) in response.into_inner().result {
526 match v.check_result {
527 Some(v) => map.insert(k, v),
531 None => return Err(Error::ExpectedOneof),
532 };
533 }
534 Ok(map)
535 }
536
537 pub async fn expand(
544 &self,
545 tuple_key: impl Into<ExpandRequestTupleKey>,
546 contextual_tuples: impl Into<Option<Vec<TupleKey>>>,
547 ) -> Result<Option<UsersetTree>> {
548 let expand_request = ExpandRequest {
549 store_id: self.store_id().to_string(),
550 tuple_key: Some(tuple_key.into()),
551 authorization_model_id: self.authorization_model_id().to_string(),
552 consistency: self.consistency().into(),
553 contextual_tuples: contextual_tuples
554 .into()
555 .map(|tuple_keys| ContextualTupleKeys { tuple_keys }),
556 };
557 let response = self
558 .client
559 .clone()
560 .expand(expand_request.clone())
561 .await
562 .map_err(|e| {
563 tracing::error!(
564 "Expand request failed with status {e}. Request: {expand_request:?}"
565 );
566 Error::RequestFailed(Box::new(e))
567 })?;
568 Ok(response.into_inner().tree)
569 }
570
571 pub async fn check_simple(&self, tuple_key: impl Into<CheckRequestTupleKey>) -> Result<bool> {
576 self.check(tuple_key, None, None, false).await
577 }
578
579 pub async fn list_objects(
584 &self,
585 r#type: impl Into<String>,
586 relation: impl Into<String>,
587 user: impl Into<String>,
588 contextual_tuples: impl Into<Option<Vec<TupleKey>>>,
589 context: impl Into<Option<prost_wkt_types::Struct>>,
590 ) -> Result<tonic::Response<ListObjectsResponse>> {
591 let request = ListObjectsRequest {
592 r#type: r#type.into(),
593 relation: relation.into(),
594 user: user.into(),
595 authorization_model_id: self.authorization_model_id().to_string(),
596 store_id: self.store_id().to_string(),
597 consistency: self.consistency().into(),
598 contextual_tuples: contextual_tuples
599 .into()
600 .map(|tuple_keys| ContextualTupleKeys { tuple_keys }),
601 context: context.into(),
602 };
603
604 self.client
605 .clone()
606 .list_objects(request.clone())
607 .await
608 .map_err(|e| {
609 tracing::error!(
610 "List-Objects request failed with status {e}. Request: {request:?}"
611 );
612 Error::RequestFailed(Box::new(e))
613 })
614 }
615
616 pub async fn delete_relations_to_object(&self, object: &str) -> Result<()> {
628 loop {
629 self.delete_relations_to_object_inner(object)
630 .await
631 .inspect_err(|e| {
632 tracing::error!("Failed to delete relations to object {object}: {e}");
633 })?;
634
635 if self.exists_relation_to(object).await? {
636 tracing::debug!(
637 "Some tuples for object {object} are still present after first sweep. Performing another deletion."
638 );
639 } else {
640 tracing::debug!("Successfully deleted all relations to object {object}");
641 break Ok(());
642 }
643 }
644 }
645
646 pub async fn exists_relation_to(&self, object: &str) -> Result<bool> {
652 let tuples = self.read_relations_to_object(object, None, 1).await?;
653 Ok(!tuples.tuples.is_empty())
654 }
655
656 async fn read_relations_to_object(
657 &self,
658 object: &str,
659 continuation_token: impl Into<Option<String>>,
660 page_size: i32,
661 ) -> Result<ReadResponse> {
662 self.read(
663 page_size,
664 TupleKeyWithoutCondition {
665 user: String::new(),
666 relation: String::new(),
667 object: object.to_string(),
668 },
669 continuation_token,
670 )
671 .await
672 .map(tonic::Response::into_inner)
673 }
674
675 async fn delete_relations_to_object_inner(&self, object: &str) -> Result<()> {
679 let read_stream = stream! {
680 let mut continuation_token = None;
681 let mut seen= HashSet::new();
684 while continuation_token != Some(String::new()) {
685 let response = self.read_relations_to_object(object, continuation_token, self.max_tuples_per_write()).await?;
686 let keys = response.tuples.into_iter().filter_map(|t| t.key).filter(|k| !seen.contains(&(k.user.clone(), k.relation.clone()))).collect::<Vec<_>>();
687 tracing::debug!("Read {} keys for object {object} that are up for deletion. Continuation token: {}", keys.len(), response.continuation_token);
688 continuation_token = Some(response.continuation_token);
689 seen.extend(keys.iter().map(|k| (k.user.clone(), k.relation.clone())));
690 yield Result::Ok(keys);
691 }
692 };
693 pin_mut!(read_stream);
694 let mut read_tuples: Option<Vec<TupleKey>> = None;
695
696 let delete_tuples = |t: Option<Vec<TupleKey>>| async {
697 match t {
698 Some(tuples) => {
699 tracing::debug!(
700 "Deleting {} tuples for object {object} that we haven't seen before.",
701 tuples.len()
702 );
703 self.write(
704 None,
705 Some(
706 tuples
707 .into_iter()
708 .map(|t| TupleKeyWithoutCondition {
709 user: t.user,
710 relation: t.relation,
711 object: t.object,
712 })
713 .collect(),
714 ),
715 )
716 .await
717 }
718 None => Ok(()),
719 }
720 };
721
722 loop {
723 let next_future = read_stream.next();
724 let deletion_future = delete_tuples(read_tuples.clone());
725
726 let (tuples, delete) = futures::join!(next_future, deletion_future);
727 delete?;
728
729 if let Some(tuples) = tuples.transpose()? {
730 read_tuples = (!tuples.is_empty()).then_some(tuples);
731 } else {
732 break Ok(());
733 }
734 }
735 }
736}
737
738#[cfg(test)]
739mod tests {
740 use needs_env_var::needs_env_var;
741
742 #[needs_env_var(TEST_OPENFGA_CLIENT_GRPC_URL)]
743 mod openfga {
744 use tracing_test::traced_test;
745
746 use super::super::*;
747 use crate::{
748 client::{AuthorizationModel, Store},
749 migration::test::openfga::service_client_with_store,
750 };
751
752 async fn write_custom_roles_model(
753 client: &OpenFgaServiceClient<tonic::transport::Channel>,
754 store: &Store,
755 ) -> String {
756 let model: AuthorizationModel = serde_json::from_str(include_str!(
757 "../tests/sample-store/custom-roles/schema.json"
758 ))
759 .unwrap();
760 client
761 .clone()
762 .write_authorization_model(model.into_write_request(store.id.clone()))
763 .await
764 .unwrap()
765 .into_inner()
766 .authorization_model_id
767 }
768
769 async fn get_client_with_custom_roles_model() -> OpenFgaClient<tonic::transport::Channel> {
770 let (service_client, store) = service_client_with_store().await;
771 let auth_model_id = write_custom_roles_model(&service_client, &store).await;
772
773 OpenFgaClient::new(service_client, &store.id, auth_model_id.as_str())
774 }
775
776 #[tokio::test]
780 #[traced_test]
781 async fn test_read_single_page_unfiltered() {
782 let client = get_client_with_custom_roles_model().await;
783
784 let total = 75;
787 for i in 0..total {
788 client
789 .write(
790 vec![TupleKey {
791 user: format!("user:user{i}"),
792 relation: "member".to_string(),
793 object: "team:team1".to_string(),
794 condition: None,
795 }],
796 None,
797 )
798 .await
799 .unwrap();
800 }
801
802 let resp = client
804 .read(100, None, None::<String>)
805 .await
806 .expect("read with None tuple_key must succeed");
807 let inner = resp.into_inner();
808 assert_eq!(inner.tuples.len(), total);
809 assert!(
810 inner.continuation_token.is_empty(),
811 "continuation token must be empty when all results fit in one page"
812 );
813
814 let resp = client
816 .read(50, None, None::<String>)
817 .await
818 .expect("read with None tuple_key must succeed");
819 let inner = resp.into_inner();
820 assert_eq!(inner.tuples.len(), 50);
821 assert!(
822 !inner.continuation_token.is_empty(),
823 "continuation token must be set when more pages are available"
824 );
825
826 let resp = client
828 .read(50, None, Some(inner.continuation_token))
829 .await
830 .expect("read with continuation token must succeed");
831 let inner = resp.into_inner();
832 assert_eq!(inner.tuples.len(), total - 50);
833 assert!(inner.continuation_token.is_empty());
834 }
835
836 #[tokio::test]
841 #[traced_test]
842 async fn test_read_single_page_filtered_backward_compat() {
843 let client = get_client_with_custom_roles_model().await;
844
845 client
846 .write(
847 vec![
848 TupleKey {
849 user: "user:alice".to_string(),
850 relation: "member".to_string(),
851 object: "team:team1".to_string(),
852 condition: None,
853 },
854 TupleKey {
855 user: "user:bob".to_string(),
856 relation: "member".to_string(),
857 object: "team:team2".to_string(),
858 condition: None,
859 },
860 ],
861 None,
862 )
863 .await
864 .unwrap();
865
866 let resp = client
868 .read(
869 100,
870 ReadRequestTupleKey {
871 user: String::new(),
872 relation: "member".to_string(),
873 object: "team:team1".to_string(),
874 },
875 None::<String>,
876 )
877 .await
878 .unwrap();
879 let inner = resp.into_inner();
880 assert_eq!(inner.tuples.len(), 1);
881 assert_eq!(inner.tuples[0].key.as_ref().unwrap().user, "user:alice");
882 }
883
884 #[tokio::test]
886 #[traced_test]
887 async fn test_read_all_pages_empty_tuple() {
888 let client = get_client_with_custom_roles_model().await;
889
890 let loop_count = 100;
891 let tuples_per_loop = 3;
892 for i in 0..loop_count {
893 client
896 .write(
897 vec![
898 TupleKey {
899 user: format!("user:user{i}"),
900 relation: "member".to_string(),
901 object: "team:team1".to_string(),
902 condition: None,
903 },
904 TupleKey {
905 user: format!("role:role{i}#assignee"),
906 relation: "role_assigner".to_string(),
907 object: "org:org1".to_string(),
908 condition: None,
909 },
910 TupleKey {
911 user: format!("org:org{i}"),
912 relation: "org".to_string(),
913 object: "asset-category:ac{i}".to_string(),
914 condition: None,
915 },
916 ],
917 None,
918 )
919 .await
920 .unwrap();
921 }
922
923 let tuples = client
924 .read_all_pages(None::<ReadRequestTupleKey>, 50, u32::MAX)
925 .await
926 .unwrap();
927 assert_eq!(tuples.len(), loop_count * tuples_per_loop);
928 }
929
930 #[tokio::test]
931 #[traced_test]
932 async fn test_delete_relations_to_object() {
933 let client = get_client_with_custom_roles_model().await;
934 let object = "team:team1";
935
936 assert!(!client.exists_relation_to(object).await.unwrap());
937
938 client
939 .write(
940 vec![TupleKey {
941 user: "user:user1".to_string(),
942 relation: "member".to_string(),
943 object: object.to_string(),
944 condition: None,
945 }],
946 None,
947 )
948 .await
949 .unwrap();
950 assert!(client.exists_relation_to(object).await.unwrap());
951 client.delete_relations_to_object(object).await.unwrap();
952 assert!(!client.exists_relation_to(object).await.unwrap());
953 }
954
955 #[tokio::test]
956 #[traced_test]
957 async fn test_delete_relations_to_object_usersets() {
958 let client = get_client_with_custom_roles_model().await;
959 let object: &str = "role:admin";
960
961 assert!(!client.exists_relation_to(object).await.unwrap());
962
963 client
964 .write(
965 vec![TupleKey {
966 user: "team:team1#member".to_string(),
967 relation: "assignee".to_string(),
968 object: object.to_string(),
969 condition: None,
970 }],
971 None,
972 )
973 .await
974 .unwrap();
975 assert!(client.exists_relation_to(object).await.unwrap());
976 client.delete_relations_to_object(object).await.unwrap();
977 assert!(!client.exists_relation_to(object).await.unwrap());
978 }
979
980 #[tokio::test]
981 #[traced_test]
982 async fn test_delete_relations_to_object_empty() {
983 let client = get_client_with_custom_roles_model().await;
984 let object = "team:team1";
985
986 assert!(!client.exists_relation_to(object).await.unwrap());
987 client.delete_relations_to_object(object).await.unwrap();
988 assert!(!client.exists_relation_to(object).await.unwrap());
989 }
990
991 #[tokio::test]
992 #[traced_test]
993 async fn test_delete_relations_to_object_many() {
994 let client = get_client_with_custom_roles_model().await;
995 let object = "org:org1";
996
997 assert!(!client.exists_relation_to(object).await.unwrap());
998
999 for i in 0..502 {
1000 client
1001 .write(
1002 vec![
1003 TupleKey {
1004 user: format!("user:user{i}"),
1005 relation: "member".to_string(),
1006 object: object.to_string(),
1007 condition: None,
1008 },
1009 TupleKey {
1010 user: format!("role:role{i}#assignee"),
1011 relation: "role_assigner".to_string(),
1012 object: object.to_string(),
1013 condition: None,
1014 },
1015 ],
1016 None,
1017 )
1018 .await
1019 .unwrap();
1020 }
1021
1022 let object_2 = "org:org2";
1024 client
1025 .write(
1026 vec![TupleKey {
1027 user: "user:user1".to_string(),
1028 relation: "owner".to_string(),
1029 object: object_2.to_string(),
1030 condition: None,
1031 }],
1032 None,
1033 )
1034 .await
1035 .unwrap();
1036
1037 assert!(client.exists_relation_to(object).await.unwrap());
1038 assert!(client.exists_relation_to(object_2).await.unwrap());
1039
1040 client.delete_relations_to_object(object).await.unwrap();
1041
1042 assert!(!client.exists_relation_to(object).await.unwrap());
1043 assert!(client.exists_relation_to(object_2).await.unwrap());
1044 assert!(
1045 client
1046 .check_simple(TupleKeyWithoutCondition {
1047 user: "user:user1".to_string(),
1048 relation: "role_assigner".to_string(),
1049 object: object_2.to_string(),
1050 })
1051 .await
1052 .unwrap()
1053 );
1054 }
1055
1056 #[tokio::test]
1057 #[traced_test]
1058 async fn test_write_with_options_ignore_duplicate() {
1059 let client = get_client_with_custom_roles_model().await;
1060 let tuple = TupleKey {
1061 user: "user:user1".to_string(),
1062 relation: "member".to_string(),
1063 object: "team:team1".to_string(),
1064 condition: None,
1065 };
1066
1067 client
1069 .write_with_options(vec![tuple.clone()], None, WriteOptions::default())
1070 .await
1071 .unwrap();
1072
1073 let result = client
1075 .write_with_options(vec![tuple.clone()], None, WriteOptions::default())
1076 .await;
1077 assert!(result.is_err());
1078
1079 let options = WriteOptions {
1081 on_duplicate: ConflictBehavior::Ignore,
1082 on_missing: ConflictBehavior::Fail,
1083 };
1084 client
1085 .write_with_options(vec![tuple], None, options)
1086 .await
1087 .unwrap();
1088 }
1089
1090 #[tokio::test]
1091 #[traced_test]
1092 async fn test_write_with_options_ignore_missing() {
1093 let client = get_client_with_custom_roles_model().await;
1094 let tuple_key = TupleKeyWithoutCondition {
1095 user: "user:user1".to_string(),
1096 relation: "member".to_string(),
1097 object: "team:team1".to_string(),
1098 };
1099
1100 let result = client
1102 .write_with_options(None, vec![tuple_key.clone()], WriteOptions::default())
1103 .await;
1104 assert!(result.is_err());
1105
1106 let options = WriteOptions {
1108 on_duplicate: ConflictBehavior::Fail,
1109 on_missing: ConflictBehavior::Ignore,
1110 };
1111 client
1112 .write_with_options(None, vec![tuple_key], options)
1113 .await
1114 .unwrap();
1115 }
1116
1117 #[tokio::test]
1118 #[traced_test]
1119 async fn test_write_with_options_idempotent() {
1120 let client = get_client_with_custom_roles_model().await;
1121 let tuple = TupleKey {
1122 user: "user:user1".to_string(),
1123 relation: "member".to_string(),
1124 object: "team:team1".to_string(),
1125 condition: None,
1126 };
1127
1128 let options = WriteOptions::new_idempotent();
1129
1130 client
1132 .write_with_options(vec![tuple.clone()], None, options)
1133 .await
1134 .unwrap();
1135 client
1136 .write_with_options(vec![tuple], None, options)
1137 .await
1138 .unwrap();
1139
1140 let tuple_key = TupleKeyWithoutCondition {
1142 user: "user:nonexistent".to_string(),
1143 relation: "member".to_string(),
1144 object: "team:team1".to_string(),
1145 };
1146 client
1147 .write_with_options(None, vec![tuple_key], options)
1148 .await
1149 .unwrap();
1150 }
1151
1152 #[tokio::test]
1153 #[traced_test]
1154 #[allow(clippy::similar_names)]
1155 async fn test_write_with_options_mixed_operations() {
1156 let client = get_client_with_custom_roles_model().await;
1157
1158 let tuple1 = TupleKey {
1160 user: "user:user1".to_string(),
1161 relation: "member".to_string(),
1162 object: "team:team1".to_string(),
1163 condition: None,
1164 };
1165 client.write(vec![tuple1.clone()], None).await.unwrap();
1166
1167 let tuple2 = TupleKey {
1169 user: "user:user2".to_string(),
1170 relation: "member".to_string(),
1171 object: "team:team1".to_string(),
1172 condition: None,
1173 };
1174 let delete_key = TupleKeyWithoutCondition {
1175 user: tuple1.user,
1176 relation: tuple1.relation,
1177 object: tuple1.object,
1178 };
1179
1180 client
1181 .write_with_options(vec![tuple2], vec![delete_key], WriteOptions::default())
1182 .await
1183 .unwrap();
1184
1185 let tuples = client
1187 .read_all_pages(
1188 Some(TupleKeyWithoutCondition {
1189 user: String::new(),
1190 relation: "member".to_string(),
1191 object: "team:team1".to_string(),
1192 }),
1193 10,
1194 10,
1195 )
1196 .await
1197 .unwrap();
1198 assert_eq!(tuples.len(), 1);
1199 assert_eq!(tuples[0].key.as_ref().unwrap().user, "user:user2");
1200 }
1201
1202 #[tokio::test]
1203 #[traced_test]
1204 async fn test_write_with_options_empty_operations() {
1205 let client = get_client_with_custom_roles_model().await;
1206
1207 let result = client
1209 .write_with_options(
1210 None::<Vec<TupleKey>>,
1211 None::<Vec<TupleKeyWithoutCondition>>,
1212 WriteOptions::default(),
1213 )
1214 .await;
1215 assert!(result.is_ok());
1216 }
1217 }
1218}