openfga_client/
model_client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4};
5
6use async_stream::stream;
7use futures::{StreamExt, pin_mut};
8use tonic::codegen::{Body, Bytes, StdError};
9
10use crate::{
11    client::{
12        BatchCheckItem, BatchCheckRequest, CheckRequest, CheckRequestTupleKey,
13        ConsistencyPreference, ContextualTupleKeys, ExpandRequest, ExpandRequestTupleKey,
14        ListObjectsRequest, ListObjectsResponse, OpenFgaServiceClient, ReadRequest,
15        ReadRequestTupleKey, ReadResponse, Tuple, TupleKey, TupleKeyWithoutCondition, UsersetTree,
16        WriteRequest, WriteRequestDeletes, WriteRequestWrites,
17        batch_check_single_result::CheckResult,
18    },
19    error::{Error, Result},
20};
21
22const DEFAULT_MAX_TUPLES_PER_WRITE: i32 = 100;
23
24#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
25/// Behavior when encountering conflicts during write operations.
26///
27/// See [OpenFGA documentation](https://openfga.dev/docs/getting-started/update-tuples#05-ignoring-duplicate-or-missing-tuples)
28/// for more details.
29pub enum ConflictBehavior {
30    /// Fail the operation on conflict (default behavior).
31    #[default]
32    Fail,
33    /// Ignore conflicts and continue processing.
34    Ignore,
35}
36
37impl ConflictBehavior {
38    fn as_str(&self) -> &str {
39        match self {
40            ConflictBehavior::Fail => "",
41            ConflictBehavior::Ignore => "ignore",
42        }
43    }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47/// Options for write operations.
48///
49/// This allows you to control how OpenFGA handles duplicate writes and missing deletes.
50/// See [OpenFGA documentation](https://openfga.dev/docs/getting-started/update-tuples#05-ignoring-duplicate-or-missing-tuples)
51/// for more details.
52pub struct WriteOptions {
53    /// Behavior when writing a tuple that already exists.
54    pub on_duplicate: ConflictBehavior,
55    /// Behavior when deleting a tuple that doesn't exist.
56    pub on_missing: ConflictBehavior,
57}
58
59impl WriteOptions {
60    #[must_use]
61    pub fn new_idempotent() -> Self {
62        Self {
63            on_duplicate: ConflictBehavior::Ignore,
64            on_missing: ConflictBehavior::Ignore,
65        }
66    }
67}
68
69impl Default for WriteOptions {
70    fn default() -> Self {
71        Self {
72            on_duplicate: ConflictBehavior::Fail,
73            on_missing: ConflictBehavior::Fail,
74        }
75    }
76}
77
78#[derive(Clone, Debug)]
79/// Wrapper around the generated [`OpenFgaServiceClient`].
80///
81/// Why you should use this wrapper:
82///
83/// * Handles the `store_id` and `authorization_model_id` for you - you don't need to pass them in every request
84/// * Applies the same configured `consistency` to all requests
85/// * Ensures the number of writes and deletes does not exceed OpenFGA's limit
86/// * Uses tracing to log errors
87/// * Never sends empty writes or deletes, which fails on OpenFGA
88/// * Uses `impl Into<ReadRequestTupleKey>` arguments instead of very specific types like [`ReadRequestTupleKey`]
89/// * Most methods don't require mutable access to the client. Cloning tonic clients is cheap.
90/// * If a method is missing, the [`OpenFgaClient::client()`] provides access to the underlying client with full control
91///
92/// # Example
93///
94/// ```no_run
95/// use openfga_client::client::{OpenFgaServiceClient, OpenFgaClient};
96/// use tonic::transport::Channel;
97///
98/// #[tokio::main]
99/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
100///     let endpoint = "http://localhost:8080";
101///     let service_client = OpenFgaServiceClient::connect(endpoint).await?;
102///     let client = OpenFgaClient::new(service_client, "<store_id>", "<authorization_model_id>");
103///
104///     // Use the client to interact with OpenFGA
105///     Ok(())
106/// }
107/// ```
108pub struct OpenFgaClient<T> {
109    client: OpenFgaServiceClient<T>,
110    inner: Arc<ModelClientInner>,
111}
112
113#[derive(Debug, Clone)]
114struct ModelClientInner {
115    store_id: String,
116    authorization_model_id: String,
117    max_tuples_per_write: i32,
118    consistency: ConsistencyPreference,
119}
120
121#[cfg(feature = "auth-middle")]
122/// Specialization of the [`OpenFgaClient`] that includes optional
123/// authentication with pre-shared keys (Bearer tokens) or client credentials.
124/// For more fine-granular control, construct [`OpenFgaClient`] directly
125/// with a custom [`OpenFgaServiceClient`].
126pub type BasicOpenFgaClient = OpenFgaClient<crate::client::BasicAuthLayer>;
127
128impl<T> OpenFgaClient<T>
129where
130    T: tonic::client::GrpcService<tonic::body::Body>,
131    T::Error: Into<StdError>,
132    T::ResponseBody: Body<Data = Bytes> + Send + 'static,
133    <T::ResponseBody as Body>::Error: Into<StdError> + Send,
134    T: Clone,
135{
136    /// Create a new `OpenFgaModelClient` with the given `store_id` and `authorization_model_id`.
137    #[must_use]
138    pub fn new(
139        client: OpenFgaServiceClient<T>,
140        store_id: &str,
141        authorization_model_id: &str,
142    ) -> Self {
143        OpenFgaClient {
144            client,
145            inner: Arc::new(ModelClientInner {
146                store_id: store_id.to_string(),
147                authorization_model_id: authorization_model_id.to_string(),
148                max_tuples_per_write: DEFAULT_MAX_TUPLES_PER_WRITE,
149                consistency: ConsistencyPreference::MinimizeLatency,
150            }),
151        }
152    }
153
154    /// Set the `max_tuples_per_write` for the client.
155    #[must_use]
156    pub fn set_max_tuples_per_write(mut self, max_tuples_per_write: i32) -> Self {
157        let inner = Arc::unwrap_or_clone(self.inner);
158        self.inner = Arc::new(ModelClientInner {
159            store_id: inner.store_id,
160            authorization_model_id: inner.authorization_model_id,
161            max_tuples_per_write,
162            consistency: inner.consistency,
163        });
164        self
165    }
166
167    /// Set the `consistency` for the client.
168    #[must_use]
169    pub fn set_consistency(mut self, consistency: impl Into<ConsistencyPreference>) -> Self {
170        let inner = Arc::unwrap_or_clone(self.inner);
171        self.inner = Arc::new(ModelClientInner {
172            store_id: inner.store_id,
173            authorization_model_id: inner.authorization_model_id,
174            max_tuples_per_write: inner.max_tuples_per_write,
175            consistency: consistency.into(),
176        });
177        self
178    }
179
180    /// Get the `store_id` of the client.
181    pub fn store_id(&self) -> &str {
182        &self.inner.store_id
183    }
184
185    /// Get the `authorization_model_id` of the client.
186    pub fn authorization_model_id(&self) -> &str {
187        &self.inner.authorization_model_id
188    }
189
190    /// Get the `max_tuples_per_write` of the client.
191    pub fn max_tuples_per_write(&self) -> i32 {
192        self.inner.max_tuples_per_write
193    }
194
195    /// Get the underlying `OpenFgaServiceClient`.
196    pub fn client(&self) -> OpenFgaServiceClient<T> {
197        self.client.clone()
198    }
199
200    /// Get the `consistency` of the client.
201    pub fn consistency(&self) -> ConsistencyPreference {
202        self.inner.consistency
203    }
204
205    /// Write or delete tuples from FGA.
206    /// This is a wrapper around [`OpenFgaServiceClient::write`] that ensures that:
207    ///
208    /// * Ensures the number of writes and deletes does not exceed OpenFGA's limit
209    /// * Does not send empty writes or deletes
210    /// * Traces any errors that occur
211    /// * Enriches the error with the `write_request` that caused the error
212    ///
213    /// All writes happen in a single transaction.
214    ///
215    /// OpenFGA currently has a default limit of 100 tuples per write
216    /// (sum of writes and deletes).
217    ///
218    /// This `write` method will fail if the number of writes and deletes exceeds
219    /// `max_tuples_per_write` which defaults to 100.
220    /// To change this limit, use [`Self::set_max_tuples_per_write`].
221    ///
222    /// # Errors
223    /// * [`Error::TooManyWrites`] if the number of writes and deletes exceeds `max_tuples_per_write`
224    /// * [`Error::RequestFailed`] if the write request fails
225    ///
226    pub async fn write(
227        &self,
228        writes: impl Into<Option<Vec<TupleKey>>>,
229        deletes: impl Into<Option<Vec<TupleKeyWithoutCondition>>>,
230    ) -> Result<()> {
231        self.write_with_options(writes, deletes, WriteOptions::default())
232            .await
233    }
234
235    /// Write or delete tuples from FGA with custom conflict handling options.
236    /// This is a wrapper around [`OpenFgaServiceClient::write`] that ensures that:
237    ///
238    /// * Ensures the number of writes and deletes does not exceed OpenFGA's limit
239    /// * Does not send empty writes or deletes
240    /// * Traces any errors that occur
241    /// * Enriches the error with the `write_request` that caused the error
242    /// * Allows configuring behavior for duplicate writes and missing deletes
243    ///
244    /// All writes happen in a single transaction.
245    ///
246    /// OpenFGA currently has a default limit of 100 tuples per write
247    /// (sum of writes and deletes).
248    ///
249    /// This `write_with_options` method will fail if the number of writes and deletes exceeds
250    /// `max_tuples_per_write` which defaults to 100.
251    /// To change this limit, use [`Self::set_max_tuples_per_write`].
252    ///
253    /// # Example
254    ///
255    /// ```no_run
256    /// use openfga_client::client::{ConflictBehavior, WriteOptions, OpenFgaClient, OpenFgaServiceClient};
257    ///
258    /// #[tokio::main]
259    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
260    ///     let endpoint = "http://localhost:8080";
261    ///     let service_client = OpenFgaServiceClient::connect(endpoint).await?;
262    ///     let client = OpenFgaClient::new(service_client, "store_id", "model_id");
263    ///     
264    ///     let options = WriteOptions {
265    ///         on_duplicate: ConflictBehavior::Ignore,
266    ///         on_missing: ConflictBehavior::Ignore,
267    ///     };
268    ///
269    ///     let writes = vec![/* TupleKey instances */];
270    ///     client.write_with_options(writes, None, options).await?;
271    ///     Ok(())
272    /// }
273    /// ```
274    ///
275    /// # Errors
276    /// * [`Error::TooManyWrites`] if the number of writes and deletes exceeds `max_tuples_per_write`
277    /// * [`Error::RequestFailed`] if the write request fails
278    ///
279    pub async fn write_with_options(
280        &self,
281        writes: impl Into<Option<Vec<TupleKey>>>,
282        deletes: impl Into<Option<Vec<TupleKeyWithoutCondition>>>,
283        options: WriteOptions,
284    ) -> Result<()> {
285        let writes = writes.into().and_then(|w| (!w.is_empty()).then_some(w));
286        let deletes = deletes.into().and_then(|d| (!d.is_empty()).then_some(d));
287
288        if writes.is_none() && deletes.is_none() {
289            return Ok(());
290        }
291
292        let num_writes_and_deletes = i32::try_from(
293            #[allow(clippy::manual_saturating_arithmetic)]
294            writes
295                .as_ref()
296                .map_or(0, Vec::len)
297                .checked_add(deletes.as_ref().map_or(0, Vec::len))
298                .unwrap_or(usize::MAX),
299        )
300        .unwrap_or(i32::MAX);
301
302        if num_writes_and_deletes > self.max_tuples_per_write() {
303            tracing::error!(
304                "Too many writes and deletes in single OpenFGA transaction (actual) {} > {} (max)",
305                num_writes_and_deletes,
306                self.max_tuples_per_write()
307            );
308            return Err(Error::TooManyWrites {
309                actual: num_writes_and_deletes,
310                max: self.max_tuples_per_write(),
311            });
312        }
313
314        let write_request = WriteRequest {
315            store_id: self.store_id().to_string(),
316            writes: writes.map(|writes| WriteRequestWrites {
317                tuple_keys: writes,
318                on_duplicate: options.on_duplicate.as_str().to_string(),
319            }),
320            deletes: deletes.map(|deletes| WriteRequestDeletes {
321                on_missing: options.on_missing.as_str().to_string(),
322                tuple_keys: deletes,
323            }),
324            authorization_model_id: self.authorization_model_id().to_string(),
325        };
326
327        self.client
328            .clone()
329            .write(write_request.clone())
330            .await
331            .map_err(|e| {
332                let write_request_debug = format!("{write_request:?}");
333                tracing::error!(
334                    "Write request failed with status {e}. Request: {write_request_debug}"
335                );
336                Error::RequestFailed(Box::new(e))
337            })
338            .map(|_| ())
339    }
340
341    /// Read tuples from OpenFGA.
342    /// This is a wrapper around [`OpenFgaServiceClient::read`] that:
343    ///
344    /// * Traces any errors that occur
345    /// * Enriches the error with the `read_request` that caused the error
346    ///
347    /// # Errors
348    /// * [`Error::RequestFailed`] if the read request fails
349    pub async fn read(
350        &self,
351        page_size: i32,
352        tuple_key: impl Into<ReadRequestTupleKey>,
353        continuation_token: impl Into<Option<String>>,
354    ) -> Result<tonic::Response<ReadResponse>> {
355        let read_request = ReadRequest {
356            store_id: self.store_id().to_string(),
357            page_size: Some(page_size),
358            continuation_token: continuation_token.into().unwrap_or_default(),
359            tuple_key: Some(tuple_key.into()),
360            consistency: self.consistency().into(),
361        };
362        self.client
363            .clone()
364            .read(read_request.clone())
365            .await
366            .map_err(|e| {
367                let read_request_debug = format!("{read_request:?}");
368                tracing::error!(
369                    "Read request failed with status {e}. Request: {read_request_debug}"
370                );
371                Error::RequestFailed(Box::new(e))
372            })
373    }
374
375    /// Read all tuples, with pagination.
376    /// For details on the parameters, see [`OpenFgaServiceClient::read_all_pages`].
377    ///
378    /// # Errors
379    /// * [`Error::RequestFailed`] If a request to OpenFGA fails.
380    /// * [`Error::TooManyPages`] If the number of pages read exceeds `max_pages`.
381    ///
382    pub async fn read_all_pages(
383        &self,
384        tuple: Option<impl Into<ReadRequestTupleKey>>,
385        page_size: i32,
386        max_pages: u32,
387    ) -> Result<Vec<Tuple>> {
388        let store_id = self.store_id().to_string();
389        self.client
390            .clone()
391            .read_all_pages(&store_id, tuple, self.consistency(), page_size, max_pages)
392            .await
393    }
394
395    /// Perform a check.
396    /// Returns `true` if the check is allowed, `false` otherwise.
397    ///
398    /// # Errors
399    /// * [`Error::RequestFailed`] if the check request fails
400    ///
401    pub async fn check(
402        &self,
403        tuple_key: impl Into<CheckRequestTupleKey>,
404        contextual_tuples: impl Into<Option<Vec<TupleKey>>>,
405        context: impl Into<Option<prost_wkt_types::Struct>>,
406        trace: bool,
407    ) -> Result<bool> {
408        let contextual_tuples = contextual_tuples
409            .into()
410            .and_then(|c| (!c.is_empty()).then_some(c))
411            .map(|tuple_keys| ContextualTupleKeys { tuple_keys });
412
413        let check_request = CheckRequest {
414            store_id: self.store_id().to_string(),
415            tuple_key: Some(tuple_key.into()),
416            consistency: self.consistency().into(),
417            contextual_tuples,
418            authorization_model_id: self.authorization_model_id().to_string(),
419            context: context.into(),
420            trace,
421        };
422        let response = self
423            .client
424            .clone()
425            .check(check_request.clone())
426            .await
427            .map_err(|e| {
428                let check_request_debug = format!("{check_request:?}");
429                tracing::error!(
430                    "Check request failed with status {e}. Request: {check_request_debug}"
431                );
432                Error::RequestFailed(Box::new(e))
433            })?;
434        Ok(response.get_ref().allowed)
435    }
436
437    /// Check multiple tuples at once.
438    /// Returned `HashMap` contains one key for each `correlation_id` in the input.
439    ///
440    /// # Errors
441    /// * [`Error::RequestFailed`] if the check request fails
442    /// * [`Error::ExpectedOneof`] if the server unexpectedly returns `None` for one of the tuples
443    ///   to check.
444    pub async fn batch_check<I>(
445        &self,
446        checks: impl IntoIterator<Item = I>,
447    ) -> Result<HashMap<String, CheckResult>>
448    where
449        I: Into<BatchCheckItem>,
450    {
451        let checks: Vec<BatchCheckItem> = checks.into_iter().map(Into::into).collect();
452        let request = BatchCheckRequest {
453            store_id: self.store_id().to_string(),
454            checks,
455            authorization_model_id: self.authorization_model_id().to_string(),
456            consistency: self.consistency().into(),
457        };
458
459        let response = self
460            .client
461            .clone()
462            .batch_check(request.clone())
463            .await
464            .map_err(|e| {
465                let request_debug = format!("{request:?}");
466                tracing::error!(
467                    "Batch-Check request failed with status {e}. Request: {request_debug}"
468                );
469                Error::RequestFailed(Box::new(e))
470            })?;
471
472        let mut map = HashMap::new();
473        for (k, v) in response.into_inner().result {
474            match v.check_result {
475                // The server should return `Some(_)` for every tuple to check.
476                // `None` is not expected to occur, hence returning an error for the *entire*
477                // batch request to keep the API simple.
478                Some(v) => map.insert(k, v),
479                None => return Err(Error::ExpectedOneof),
480            };
481        }
482        Ok(map)
483    }
484
485    /// Expand all relationships in userset tree format.
486    /// Useful to reason about and debug a certain relationship.
487    ///
488    /// # Errors
489    /// * [`Error::RequestFailed`] if the expand request fails
490    ///
491    pub async fn expand(
492        &self,
493        tuple_key: impl Into<ExpandRequestTupleKey>,
494        contextual_tuples: impl Into<Option<Vec<TupleKey>>>,
495    ) -> Result<Option<UsersetTree>> {
496        let expand_request = ExpandRequest {
497            store_id: self.store_id().to_string(),
498            tuple_key: Some(tuple_key.into()),
499            authorization_model_id: self.authorization_model_id().to_string(),
500            consistency: self.consistency().into(),
501            contextual_tuples: contextual_tuples
502                .into()
503                .map(|tuple_keys| ContextualTupleKeys { tuple_keys }),
504        };
505        let response = self
506            .client
507            .clone()
508            .expand(expand_request.clone())
509            .await
510            .map_err(|e| {
511                tracing::error!(
512                    "Expand request failed with status {e}. Request: {expand_request:?}"
513                );
514                Error::RequestFailed(Box::new(e))
515            })?;
516        Ok(response.into_inner().tree)
517    }
518
519    /// Simplified version of [`Self::check`] without contextual tuples, context, or trace.
520    ///
521    /// # Errors
522    /// Check the [`Self::check`] method for possible errors.
523    pub async fn check_simple(&self, tuple_key: impl Into<CheckRequestTupleKey>) -> Result<bool> {
524        self.check(tuple_key, None, None, false).await
525    }
526
527    /// List all objects of the given type that the user has a relation with.
528    ///
529    /// # Errors
530    /// * [`Error::RequestFailed`] if the list-objects request fails
531    pub async fn list_objects(
532        &self,
533        r#type: impl Into<String>,
534        relation: impl Into<String>,
535        user: impl Into<String>,
536        contextual_tuples: impl Into<Option<Vec<TupleKey>>>,
537        context: impl Into<Option<prost_wkt_types::Struct>>,
538    ) -> Result<tonic::Response<ListObjectsResponse>> {
539        let request = ListObjectsRequest {
540            r#type: r#type.into(),
541            relation: relation.into(),
542            user: user.into(),
543            authorization_model_id: self.authorization_model_id().to_string(),
544            store_id: self.store_id().to_string(),
545            consistency: self.consistency().into(),
546            contextual_tuples: contextual_tuples
547                .into()
548                .map(|tuple_keys| ContextualTupleKeys { tuple_keys }),
549            context: context.into(),
550        };
551
552        self.client
553            .clone()
554            .list_objects(request.clone())
555            .await
556            .map_err(|e| {
557                tracing::error!(
558                    "List-Objects request failed with status {e}. Request: {request:?}"
559                );
560                Error::RequestFailed(Box::new(e))
561            })
562    }
563
564    /// Delete all relations that other entities have to the given `object`, that
565    /// is, all tuples with the "object" field set to the given `object`.
566    ///
567    /// This method uses streamed pagination internally, so that also large amounts of tuples can be deleted.
568    /// Please not that this method does not delete tuples where the given object has a relation TO another entity.
569    ///
570    /// Iteration is stopped when no more tuples are returned from OpenFGA.
571    ///
572    /// # Errors
573    /// * [`Error::RequestFailed`] if a read or delete request fails
574    ///
575    pub async fn delete_relations_to_object(&self, object: &str) -> Result<()> {
576        loop {
577            self.delete_relations_to_object_inner(object)
578                .await
579                .inspect_err(|e| {
580                    tracing::error!("Failed to delete relations to object {object}: {e}");
581                })?;
582
583            if self.exists_relation_to(object).await? {
584                tracing::debug!(
585                    "Some tuples for object {object} are still present after first sweep. Performing another deletion."
586                );
587            } else {
588                tracing::debug!("Successfully deleted all relations to object {object}");
589                break Ok(());
590            }
591        }
592    }
593
594    /// Check if any direct relation to the given object exists.
595    /// This does not check if the object is used as a user in relations to other objects.
596    ///
597    /// # Errors
598    /// * [`Error::RequestFailed`] if the read request fails
599    pub async fn exists_relation_to(&self, object: &str) -> Result<bool> {
600        let tuples = self.read_relations_to_object(object, None, 1).await?;
601        Ok(!tuples.tuples.is_empty())
602    }
603
604    async fn read_relations_to_object(
605        &self,
606        object: &str,
607        continuation_token: impl Into<Option<String>>,
608        page_size: i32,
609    ) -> Result<ReadResponse> {
610        self.read(
611            page_size,
612            TupleKeyWithoutCondition {
613                user: String::new(),
614                relation: String::new(),
615                object: object.to_string(),
616            },
617            continuation_token,
618        )
619        .await
620        .map(tonic::Response::into_inner)
621    }
622
623    /// # Errors
624    /// * [`Error::RequestFailed`] if a read or delete request fails
625    ///
626    async fn delete_relations_to_object_inner(&self, object: &str) -> Result<()> {
627        let read_stream = stream! {
628            let mut continuation_token = None;
629            // We need to keep track of seen keys, as OpenFGA might return
630            // duplicates even of `HigherConsistency`.
631            let mut seen= HashSet::new();
632            while continuation_token != Some(String::new()) {
633                let response = self.read_relations_to_object(object, continuation_token, self.max_tuples_per_write()).await?;
634                let keys = response.tuples.into_iter().filter_map(|t| t.key).filter(|k| !seen.contains(&(k.user.clone(), k.relation.clone()))).collect::<Vec<_>>();
635                tracing::debug!("Read {} keys for object {object} that are up for deletion. Continuation token: {}", keys.len(), response.continuation_token);
636                continuation_token = Some(response.continuation_token);
637                seen.extend(keys.iter().map(|k| (k.user.clone(), k.relation.clone())));
638                yield Result::Ok(keys);
639            }
640        };
641        pin_mut!(read_stream);
642        let mut read_tuples: Option<Vec<TupleKey>> = None;
643
644        let delete_tuples = |t: Option<Vec<TupleKey>>| async {
645            match t {
646                Some(tuples) => {
647                    tracing::debug!(
648                        "Deleting {} tuples for object {object} that we haven't seen before.",
649                        tuples.len()
650                    );
651                    self.write(
652                        None,
653                        Some(
654                            tuples
655                                .into_iter()
656                                .map(|t| TupleKeyWithoutCondition {
657                                    user: t.user,
658                                    relation: t.relation,
659                                    object: t.object,
660                                })
661                                .collect(),
662                        ),
663                    )
664                    .await
665                }
666                None => Ok(()),
667            }
668        };
669
670        loop {
671            let next_future = read_stream.next();
672            let deletion_future = delete_tuples(read_tuples.clone());
673
674            let (tuples, delete) = futures::join!(next_future, deletion_future);
675            delete?;
676
677            if let Some(tuples) = tuples.transpose()? {
678                read_tuples = (!tuples.is_empty()).then_some(tuples);
679            } else {
680                break Ok(());
681            }
682        }
683    }
684}
685
686#[cfg(test)]
687mod tests {
688    use needs_env_var::needs_env_var;
689
690    #[needs_env_var(TEST_OPENFGA_CLIENT_GRPC_URL)]
691    mod openfga {
692        use tracing_test::traced_test;
693
694        use super::super::*;
695        use crate::{
696            client::{AuthorizationModel, Store},
697            migration::test::openfga::service_client_with_store,
698        };
699
700        async fn write_custom_roles_model(
701            client: &OpenFgaServiceClient<tonic::transport::Channel>,
702            store: &Store,
703        ) -> String {
704            let model: AuthorizationModel = serde_json::from_str(include_str!(
705                "../tests/sample-store/custom-roles/schema.json"
706            ))
707            .unwrap();
708            client
709                .clone()
710                .write_authorization_model(model.into_write_request(store.id.clone()))
711                .await
712                .unwrap()
713                .into_inner()
714                .authorization_model_id
715        }
716
717        async fn get_client_with_custom_roles_model() -> OpenFgaClient<tonic::transport::Channel> {
718            let (service_client, store) = service_client_with_store().await;
719            let auth_model_id = write_custom_roles_model(&service_client, &store).await;
720
721            OpenFgaClient::new(service_client, &store.id, auth_model_id.as_str())
722        }
723
724        /// Verifies that all pages are read when *not* passing a `ReadRequestTupleKey`.
725        #[tokio::test]
726        #[traced_test]
727        async fn test_read_all_pages_empty_tuple() {
728            let client = get_client_with_custom_roles_model().await;
729
730            let loop_count = 100;
731            let tuples_per_loop = 3;
732            for i in 0..loop_count {
733                // Write to different relations with different users and objects to test that an
734                // empty ReadRequestTupleKey does not filter for anything.
735                client
736                    .write(
737                        vec![
738                            TupleKey {
739                                user: format!("user:user{i}"),
740                                relation: "member".to_string(),
741                                object: "team:team1".to_string(),
742                                condition: None,
743                            },
744                            TupleKey {
745                                user: format!("role:role{i}#assignee"),
746                                relation: "role_assigner".to_string(),
747                                object: "org:org1".to_string(),
748                                condition: None,
749                            },
750                            TupleKey {
751                                user: format!("org:org{i}"),
752                                relation: "org".to_string(),
753                                object: "asset-category:ac{i}".to_string(),
754                                condition: None,
755                            },
756                        ],
757                        None,
758                    )
759                    .await
760                    .unwrap();
761            }
762
763            let tuples = client
764                .read_all_pages(None::<ReadRequestTupleKey>, 50, u32::MAX)
765                .await
766                .unwrap();
767            assert_eq!(tuples.len(), loop_count * tuples_per_loop);
768        }
769
770        #[tokio::test]
771        #[traced_test]
772        async fn test_delete_relations_to_object() {
773            let client = get_client_with_custom_roles_model().await;
774            let object = "team:team1";
775
776            assert!(!client.exists_relation_to(object).await.unwrap());
777
778            client
779                .write(
780                    vec![TupleKey {
781                        user: "user:user1".to_string(),
782                        relation: "member".to_string(),
783                        object: object.to_string(),
784                        condition: None,
785                    }],
786                    None,
787                )
788                .await
789                .unwrap();
790            assert!(client.exists_relation_to(object).await.unwrap());
791            client.delete_relations_to_object(object).await.unwrap();
792            assert!(!client.exists_relation_to(object).await.unwrap());
793        }
794
795        #[tokio::test]
796        #[traced_test]
797        async fn test_delete_relations_to_object_usersets() {
798            let client = get_client_with_custom_roles_model().await;
799            let object: &str = "role:admin";
800
801            assert!(!client.exists_relation_to(object).await.unwrap());
802
803            client
804                .write(
805                    vec![TupleKey {
806                        user: "team:team1#member".to_string(),
807                        relation: "assignee".to_string(),
808                        object: object.to_string(),
809                        condition: None,
810                    }],
811                    None,
812                )
813                .await
814                .unwrap();
815            assert!(client.exists_relation_to(object).await.unwrap());
816            client.delete_relations_to_object(object).await.unwrap();
817            assert!(!client.exists_relation_to(object).await.unwrap());
818        }
819
820        #[tokio::test]
821        #[traced_test]
822        async fn test_delete_relations_to_object_empty() {
823            let client = get_client_with_custom_roles_model().await;
824            let object = "team:team1";
825
826            assert!(!client.exists_relation_to(object).await.unwrap());
827            client.delete_relations_to_object(object).await.unwrap();
828            assert!(!client.exists_relation_to(object).await.unwrap());
829        }
830
831        #[tokio::test]
832        #[traced_test]
833        async fn test_delete_relations_to_object_many() {
834            let client = get_client_with_custom_roles_model().await;
835            let object = "org:org1";
836
837            assert!(!client.exists_relation_to(object).await.unwrap());
838
839            for i in 0..502 {
840                client
841                    .write(
842                        vec![
843                            TupleKey {
844                                user: format!("user:user{i}"),
845                                relation: "member".to_string(),
846                                object: object.to_string(),
847                                condition: None,
848                            },
849                            TupleKey {
850                                user: format!("role:role{i}#assignee"),
851                                relation: "role_assigner".to_string(),
852                                object: object.to_string(),
853                                condition: None,
854                            },
855                        ],
856                        None,
857                    )
858                    .await
859                    .unwrap();
860            }
861
862            // Also write a tuple for another org to make sure we don't delete those
863            let object_2 = "org:org2";
864            client
865                .write(
866                    vec![TupleKey {
867                        user: "user:user1".to_string(),
868                        relation: "owner".to_string(),
869                        object: object_2.to_string(),
870                        condition: None,
871                    }],
872                    None,
873                )
874                .await
875                .unwrap();
876
877            assert!(client.exists_relation_to(object).await.unwrap());
878            assert!(client.exists_relation_to(object_2).await.unwrap());
879
880            client.delete_relations_to_object(object).await.unwrap();
881
882            assert!(!client.exists_relation_to(object).await.unwrap());
883            assert!(client.exists_relation_to(object_2).await.unwrap());
884            assert!(
885                client
886                    .check_simple(TupleKeyWithoutCondition {
887                        user: "user:user1".to_string(),
888                        relation: "role_assigner".to_string(),
889                        object: object_2.to_string(),
890                    })
891                    .await
892                    .unwrap()
893            );
894        }
895
896        #[tokio::test]
897        #[traced_test]
898        async fn test_write_with_options_ignore_duplicate() {
899            let client = get_client_with_custom_roles_model().await;
900            let tuple = TupleKey {
901                user: "user:user1".to_string(),
902                relation: "member".to_string(),
903                object: "team:team1".to_string(),
904                condition: None,
905            };
906
907            // First write should succeed
908            client
909                .write_with_options(vec![tuple.clone()], None, WriteOptions::default())
910                .await
911                .unwrap();
912
913            // Second write with default options should fail
914            let result = client
915                .write_with_options(vec![tuple.clone()], None, WriteOptions::default())
916                .await;
917            assert!(result.is_err());
918
919            // Write with ignore duplicate should succeed
920            let options = WriteOptions {
921                on_duplicate: ConflictBehavior::Ignore,
922                on_missing: ConflictBehavior::Fail,
923            };
924            client
925                .write_with_options(vec![tuple], None, options)
926                .await
927                .unwrap();
928        }
929
930        #[tokio::test]
931        #[traced_test]
932        async fn test_write_with_options_ignore_missing() {
933            let client = get_client_with_custom_roles_model().await;
934            let tuple_key = TupleKeyWithoutCondition {
935                user: "user:user1".to_string(),
936                relation: "member".to_string(),
937                object: "team:team1".to_string(),
938            };
939
940            // Delete non-existent tuple with default options should fail
941            let result = client
942                .write_with_options(None, vec![tuple_key.clone()], WriteOptions::default())
943                .await;
944            assert!(result.is_err());
945
946            // Delete with ignore missing should succeed
947            let options = WriteOptions {
948                on_duplicate: ConflictBehavior::Fail,
949                on_missing: ConflictBehavior::Ignore,
950            };
951            client
952                .write_with_options(None, vec![tuple_key], options)
953                .await
954                .unwrap();
955        }
956
957        #[tokio::test]
958        #[traced_test]
959        async fn test_write_with_options_idempotent() {
960            let client = get_client_with_custom_roles_model().await;
961            let tuple = TupleKey {
962                user: "user:user1".to_string(),
963                relation: "member".to_string(),
964                object: "team:team1".to_string(),
965                condition: None,
966            };
967
968            let options = WriteOptions::new_idempotent();
969
970            // Write twice with idempotent options should succeed both times
971            client
972                .write_with_options(vec![tuple.clone()], None, options)
973                .await
974                .unwrap();
975            client
976                .write_with_options(vec![tuple], None, options)
977                .await
978                .unwrap();
979
980            // Delete non-existent tuple with idempotent options should succeed
981            let tuple_key = TupleKeyWithoutCondition {
982                user: "user:nonexistent".to_string(),
983                relation: "member".to_string(),
984                object: "team:team1".to_string(),
985            };
986            client
987                .write_with_options(None, vec![tuple_key], options)
988                .await
989                .unwrap();
990        }
991
992        #[tokio::test]
993        #[traced_test]
994        async fn test_write_with_options_mixed_operations() {
995            let client = get_client_with_custom_roles_model().await;
996
997            // First, write a tuple
998            let tuple1 = TupleKey {
999                user: "user:user1".to_string(),
1000                relation: "member".to_string(),
1001                object: "team:team1".to_string(),
1002                condition: None,
1003            };
1004            client.write(vec![tuple1.clone()], None).await.unwrap();
1005
1006            // Now write a new tuple and delete the existing one in the same request
1007            let tuple2 = TupleKey {
1008                user: "user:user2".to_string(),
1009                relation: "member".to_string(),
1010                object: "team:team1".to_string(),
1011                condition: None,
1012            };
1013            let delete_key = TupleKeyWithoutCondition {
1014                user: tuple1.user,
1015                relation: tuple1.relation,
1016                object: tuple1.object,
1017            };
1018
1019            client
1020                .write_with_options(vec![tuple2], vec![delete_key], WriteOptions::default())
1021                .await
1022                .unwrap();
1023
1024            // Verify the old tuple is gone and the new one exists
1025            let tuples = client
1026                .read_all_pages(
1027                    Some(TupleKeyWithoutCondition {
1028                        user: String::new(),
1029                        relation: "member".to_string(),
1030                        object: "team:team1".to_string(),
1031                    }),
1032                    10,
1033                    10,
1034                )
1035                .await
1036                .unwrap();
1037            assert_eq!(tuples.len(), 1);
1038            assert_eq!(tuples[0].key.as_ref().unwrap().user, "user:user2");
1039        }
1040
1041        #[tokio::test]
1042        #[traced_test]
1043        async fn test_write_with_options_empty_operations() {
1044            let client = get_client_with_custom_roles_model().await;
1045
1046            // Writing with empty writes and deletes should succeed without making a request
1047            let result = client
1048                .write_with_options(
1049                    None::<Vec<TupleKey>>,
1050                    None::<Vec<TupleKeyWithoutCondition>>,
1051                    WriteOptions::default(),
1052                )
1053                .await;
1054            assert!(result.is_ok());
1055        }
1056    }
1057}