Skip to main content

openfga_client/
model_client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4};
5
6use async_stream::stream;
7use futures::{StreamExt, pin_mut};
8use tonic::codegen::{Body, Bytes, StdError};
9
10use crate::{
11    client::{
12        BatchCheckItem, BatchCheckRequest, CheckRequest, CheckRequestTupleKey,
13        ConsistencyPreference, ContextualTupleKeys, ExpandRequest, ExpandRequestTupleKey,
14        ListObjectsRequest, ListObjectsResponse, OpenFgaServiceClient, ReadRequest,
15        ReadRequestTupleKey, ReadResponse, Tuple, TupleKey, TupleKeyWithoutCondition, UsersetTree,
16        WriteRequest, WriteRequestDeletes, WriteRequestWrites,
17        batch_check_single_result::CheckResult,
18    },
19    error::{Error, Result},
20};
21
22const DEFAULT_MAX_TUPLES_PER_WRITE: i32 = 100;
23
24#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
25/// Behavior when encountering conflicts during write operations.
26///
27/// See [OpenFGA documentation](https://openfga.dev/docs/getting-started/update-tuples#05-ignoring-duplicate-or-missing-tuples)
28/// for more details.
29pub enum ConflictBehavior {
30    /// Fail the operation on conflict (default behavior).
31    #[default]
32    Fail,
33    /// Ignore conflicts and continue processing.
34    Ignore,
35}
36
37impl ConflictBehavior {
38    fn as_str(&self) -> &str {
39        match self {
40            ConflictBehavior::Fail => "",
41            ConflictBehavior::Ignore => "ignore",
42        }
43    }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47/// Options for write operations.
48///
49/// This allows you to control how OpenFGA handles duplicate writes and missing deletes.
50/// See [OpenFGA documentation](https://openfga.dev/docs/getting-started/update-tuples#05-ignoring-duplicate-or-missing-tuples)
51/// for more details.
52pub struct WriteOptions {
53    /// Behavior when writing a tuple that already exists.
54    pub on_duplicate: ConflictBehavior,
55    /// Behavior when deleting a tuple that doesn't exist.
56    pub on_missing: ConflictBehavior,
57}
58
59impl WriteOptions {
60    #[must_use]
61    pub fn new_idempotent() -> Self {
62        Self {
63            on_duplicate: ConflictBehavior::Ignore,
64            on_missing: ConflictBehavior::Ignore,
65        }
66    }
67}
68
69impl Default for WriteOptions {
70    fn default() -> Self {
71        Self {
72            on_duplicate: ConflictBehavior::Fail,
73            on_missing: ConflictBehavior::Fail,
74        }
75    }
76}
77
78#[derive(Clone, Debug)]
79/// Wrapper around the generated [`OpenFgaServiceClient`].
80///
81/// Why you should use this wrapper:
82///
83/// * Handles the `store_id` and `authorization_model_id` for you - you don't need to pass them in every request
84/// * Applies the same configured `consistency` to all requests
85/// * Ensures the number of writes and deletes does not exceed OpenFGA's limit
86/// * Uses tracing to log errors
87/// * Never sends empty writes or deletes, which fails on OpenFGA
88/// * Uses `impl Into<ReadRequestTupleKey>` arguments instead of very specific types like [`ReadRequestTupleKey`]
89/// * Most methods don't require mutable access to the client. Cloning tonic clients is cheap.
90/// * If a method is missing, the [`OpenFgaClient::client()`] provides access to the underlying client with full control
91///
92/// # Example
93///
94/// ```no_run
95/// use openfga_client::client::{OpenFgaServiceClient, OpenFgaClient};
96/// use tonic::transport::Channel;
97///
98/// #[tokio::main]
99/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
100///     let endpoint = "http://localhost:8080";
101///     let service_client = OpenFgaServiceClient::connect(endpoint).await?;
102///     let client = OpenFgaClient::new(service_client, "<store_id>", "<authorization_model_id>");
103///
104///     // Use the client to interact with OpenFGA
105///     Ok(())
106/// }
107/// ```
108pub struct OpenFgaClient<T> {
109    client: OpenFgaServiceClient<T>,
110    inner: Arc<ModelClientInner>,
111}
112
113#[derive(Debug, Clone)]
114struct ModelClientInner {
115    store_id: String,
116    authorization_model_id: String,
117    max_tuples_per_write: i32,
118    consistency: ConsistencyPreference,
119}
120
121#[cfg(feature = "auth-middle")]
122/// Specialization of the [`OpenFgaClient`] that includes optional
123/// authentication with pre-shared keys (Bearer tokens) or client credentials.
124/// For more fine-granular control, construct [`OpenFgaClient`] directly
125/// with a custom [`OpenFgaServiceClient`].
126pub type BasicOpenFgaClient = OpenFgaClient<crate::client::BasicAuthLayer>;
127
128impl<T> OpenFgaClient<T>
129where
130    T: tonic::client::GrpcService<tonic::body::Body>,
131    T::Error: Into<StdError>,
132    T::ResponseBody: Body<Data = Bytes> + Send + 'static,
133    <T::ResponseBody as Body>::Error: Into<StdError> + Send,
134    T: Clone,
135{
136    /// Create a new `OpenFgaModelClient` with the given `store_id` and `authorization_model_id`.
137    #[must_use]
138    pub fn new(
139        client: OpenFgaServiceClient<T>,
140        store_id: &str,
141        authorization_model_id: &str,
142    ) -> Self {
143        OpenFgaClient {
144            client,
145            inner: Arc::new(ModelClientInner {
146                store_id: store_id.to_string(),
147                authorization_model_id: authorization_model_id.to_string(),
148                max_tuples_per_write: DEFAULT_MAX_TUPLES_PER_WRITE,
149                consistency: ConsistencyPreference::MinimizeLatency,
150            }),
151        }
152    }
153
154    /// Set the `max_tuples_per_write` for the client.
155    #[must_use]
156    pub fn set_max_tuples_per_write(mut self, max_tuples_per_write: i32) -> Self {
157        let inner = Arc::unwrap_or_clone(self.inner);
158        self.inner = Arc::new(ModelClientInner {
159            store_id: inner.store_id,
160            authorization_model_id: inner.authorization_model_id,
161            max_tuples_per_write,
162            consistency: inner.consistency,
163        });
164        self
165    }
166
167    /// Set the `consistency` for the client.
168    #[must_use]
169    pub fn set_consistency(mut self, consistency: impl Into<ConsistencyPreference>) -> Self {
170        let inner = Arc::unwrap_or_clone(self.inner);
171        self.inner = Arc::new(ModelClientInner {
172            store_id: inner.store_id,
173            authorization_model_id: inner.authorization_model_id,
174            max_tuples_per_write: inner.max_tuples_per_write,
175            consistency: consistency.into(),
176        });
177        self
178    }
179
180    /// Get the `store_id` of the client.
181    pub fn store_id(&self) -> &str {
182        &self.inner.store_id
183    }
184
185    /// Get the `authorization_model_id` of the client.
186    pub fn authorization_model_id(&self) -> &str {
187        &self.inner.authorization_model_id
188    }
189
190    /// Get the `max_tuples_per_write` of the client.
191    pub fn max_tuples_per_write(&self) -> i32 {
192        self.inner.max_tuples_per_write
193    }
194
195    /// Get the underlying `OpenFgaServiceClient`.
196    pub fn client(&self) -> OpenFgaServiceClient<T> {
197        self.client.clone()
198    }
199
200    /// Get the `consistency` of the client.
201    pub fn consistency(&self) -> ConsistencyPreference {
202        self.inner.consistency
203    }
204
205    /// Write or delete tuples from FGA.
206    /// This is a wrapper around [`OpenFgaServiceClient::write`] that ensures that:
207    ///
208    /// * Ensures the number of writes and deletes does not exceed OpenFGA's limit
209    /// * Does not send empty writes or deletes
210    /// * Traces any errors that occur
211    /// * Enriches the error with the `write_request` that caused the error
212    ///
213    /// All writes happen in a single transaction.
214    ///
215    /// OpenFGA currently has a default limit of 100 tuples per write
216    /// (sum of writes and deletes).
217    ///
218    /// This `write` method will fail if the number of writes and deletes exceeds
219    /// `max_tuples_per_write` which defaults to 100.
220    /// To change this limit, use [`Self::set_max_tuples_per_write`].
221    ///
222    /// # Errors
223    /// * [`Error::TooManyWrites`] if the number of writes and deletes exceeds `max_tuples_per_write`
224    /// * [`Error::RequestFailed`] if the write request fails
225    ///
226    pub async fn write(
227        &self,
228        writes: impl Into<Option<Vec<TupleKey>>>,
229        deletes: impl Into<Option<Vec<TupleKeyWithoutCondition>>>,
230    ) -> Result<()> {
231        self.write_with_options(writes, deletes, WriteOptions::default())
232            .await
233    }
234
235    /// Write or delete tuples from FGA with custom conflict handling options.
236    /// This is a wrapper around [`OpenFgaServiceClient::write`] that ensures that:
237    ///
238    /// * Ensures the number of writes and deletes does not exceed OpenFGA's limit
239    /// * Does not send empty writes or deletes
240    /// * Traces any errors that occur
241    /// * Enriches the error with the `write_request` that caused the error
242    /// * Allows configuring behavior for duplicate writes and missing deletes
243    ///
244    /// All writes happen in a single transaction.
245    ///
246    /// OpenFGA currently has a default limit of 100 tuples per write
247    /// (sum of writes and deletes).
248    ///
249    /// This `write_with_options` method will fail if the number of writes and deletes exceeds
250    /// `max_tuples_per_write` which defaults to 100.
251    /// To change this limit, use [`Self::set_max_tuples_per_write`].
252    ///
253    /// # Example
254    ///
255    /// ```no_run
256    /// use openfga_client::client::{ConflictBehavior, WriteOptions, OpenFgaClient, OpenFgaServiceClient};
257    ///
258    /// #[tokio::main]
259    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
260    ///     let endpoint = "http://localhost:8080";
261    ///     let service_client = OpenFgaServiceClient::connect(endpoint).await?;
262    ///     let client = OpenFgaClient::new(service_client, "store_id", "model_id");
263    ///     
264    ///     let options = WriteOptions {
265    ///         on_duplicate: ConflictBehavior::Ignore,
266    ///         on_missing: ConflictBehavior::Ignore,
267    ///     };
268    ///
269    ///     let writes = vec![/* TupleKey instances */];
270    ///     client.write_with_options(writes, None, options).await?;
271    ///     Ok(())
272    /// }
273    /// ```
274    ///
275    /// # Errors
276    /// * [`Error::TooManyWrites`] if the number of writes and deletes exceeds `max_tuples_per_write`
277    /// * [`Error::RequestFailed`] if the write request fails
278    ///
279    pub async fn write_with_options(
280        &self,
281        writes: impl Into<Option<Vec<TupleKey>>>,
282        deletes: impl Into<Option<Vec<TupleKeyWithoutCondition>>>,
283        options: WriteOptions,
284    ) -> Result<()> {
285        let writes = writes.into().and_then(|w| (!w.is_empty()).then_some(w));
286        let deletes = deletes.into().and_then(|d| (!d.is_empty()).then_some(d));
287
288        if writes.is_none() && deletes.is_none() {
289            return Ok(());
290        }
291
292        let num_writes_and_deletes = i32::try_from(
293            #[allow(clippy::manual_saturating_arithmetic)]
294            writes
295                .as_ref()
296                .map_or(0, Vec::len)
297                .checked_add(deletes.as_ref().map_or(0, Vec::len))
298                .unwrap_or(usize::MAX),
299        )
300        .unwrap_or(i32::MAX);
301
302        if num_writes_and_deletes > self.max_tuples_per_write() {
303            tracing::error!(
304                "Too many writes and deletes in single OpenFGA transaction (actual) {} > {} (max)",
305                num_writes_and_deletes,
306                self.max_tuples_per_write()
307            );
308            return Err(Error::TooManyWrites {
309                actual: num_writes_and_deletes,
310                max: self.max_tuples_per_write(),
311            });
312        }
313
314        let write_request = WriteRequest {
315            store_id: self.store_id().to_string(),
316            writes: writes.map(|writes| WriteRequestWrites {
317                tuple_keys: writes,
318                on_duplicate: options.on_duplicate.as_str().to_string(),
319            }),
320            deletes: deletes.map(|deletes| WriteRequestDeletes {
321                on_missing: options.on_missing.as_str().to_string(),
322                tuple_keys: deletes,
323            }),
324            authorization_model_id: self.authorization_model_id().to_string(),
325        };
326
327        self.client
328            .clone()
329            .write(write_request.clone())
330            .await
331            .map_err(|e| {
332                let write_request_debug = format!("{write_request:?}");
333                tracing::error!(
334                    "Write request failed with status {e}. Request: {write_request_debug}"
335                );
336                Error::RequestFailed(Box::new(e))
337            })
338            .map(|_| ())
339    }
340
341    /// Read tuples from OpenFGA, single page.
342    ///
343    /// `tuple_key` may be:
344    ///
345    /// * A specific [`ReadRequestTupleKey`] — returns tuples matching that
346    ///   filter. The OpenFGA server requires the filter to specify either a
347    ///   non-empty `user` or a non-empty object id (a bare `"type:"` prefix is
348    ///   *not* enough on its own).
349    /// * `None` — returns **every tuple in the store**, no filter applied.
350    ///   This is the same primitive used by the OpenFGA CLI's
351    ///   `fga store export` and is the only way to enumerate tuples without
352    ///   already knowing every user/object id.
353    ///
354    /// Note that OpenFGA's `Read` RPC caps `page_size` at 100 (proto-level
355    /// validation, not configurable). Larger requested values are rejected.
356    ///
357    /// This is a wrapper around [`OpenFgaServiceClient::read`] that:
358    ///
359    /// * Traces any errors that occur
360    /// * Enriches the error with the `read_request` that caused the error
361    ///
362    /// # Example
363    ///
364    /// ```no_run
365    /// use openfga_client::client::{OpenFgaClient, OpenFgaServiceClient, ReadRequestTupleKey};
366    ///
367    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
368    /// let service_client = OpenFgaServiceClient::connect("http://localhost:8080").await?;
369    /// let client = OpenFgaClient::new(service_client, "store_id", "model_id");
370    ///
371    /// // Filtered read — bare ReadRequestTupleKey is auto-wrapped via Into<Option<_>>.
372    /// let _filtered = client
373    ///     .read(
374    ///         100,
375    ///         ReadRequestTupleKey {
376    ///             user: "user:alice".to_string(),
377    ///             relation: "member".to_string(),
378    ///             object: "team:".to_string(),
379    ///         },
380    ///         None::<String>,
381    ///     )
382    ///     .await?;
383    ///
384    /// // Unfiltered — pass `None` to enumerate everything in the store.
385    /// let _all = client.read(100, None, None::<String>).await?;
386    /// # Ok(()) }
387    /// ```
388    ///
389    /// # Errors
390    /// * [`Error::RequestFailed`] if the read request fails
391    pub async fn read(
392        &self,
393        page_size: i32,
394        tuple_key: impl Into<Option<ReadRequestTupleKey>>,
395        continuation_token: impl Into<Option<String>>,
396    ) -> Result<tonic::Response<ReadResponse>> {
397        let read_request = ReadRequest {
398            store_id: self.store_id().to_string(),
399            page_size: Some(page_size),
400            continuation_token: continuation_token.into().unwrap_or_default(),
401            tuple_key: tuple_key.into(),
402            consistency: self.consistency().into(),
403        };
404        self.client
405            .clone()
406            .read(read_request.clone())
407            .await
408            .map_err(|e| {
409                let read_request_debug = format!("{read_request:?}");
410                tracing::error!(
411                    "Read request failed with status {e}. Request: {read_request_debug}"
412                );
413                Error::RequestFailed(Box::new(e))
414            })
415    }
416
417    /// Read all tuples, with pagination.
418    ///
419    /// `tuple` may be:
420    ///
421    /// * `Some(filter)` — returns tuples matching the filter (paginated). Same
422    ///   server-side filter requirements as [`Self::read`].
423    /// * `None` — **enumerates every tuple in the store**, paginating to
424    ///   completion. This is the supported way to back up or audit a store and
425    ///   is what the OpenFGA CLI's `fga store export` uses internally.
426    ///
427    /// For details on the other parameters, see
428    /// [`OpenFgaServiceClient::read_all_pages`].
429    ///
430    /// # Errors
431    /// * [`Error::RequestFailed`] If a request to OpenFGA fails.
432    /// * [`Error::TooManyPages`] If the number of pages read exceeds `max_pages`.
433    ///
434    pub async fn read_all_pages(
435        &self,
436        tuple: Option<impl Into<ReadRequestTupleKey>>,
437        page_size: i32,
438        max_pages: u32,
439    ) -> Result<Vec<Tuple>> {
440        let store_id = self.store_id().to_string();
441        self.client
442            .clone()
443            .read_all_pages(&store_id, tuple, self.consistency(), page_size, max_pages)
444            .await
445    }
446
447    /// Perform a check.
448    /// Returns `true` if the check is allowed, `false` otherwise.
449    ///
450    /// # Errors
451    /// * [`Error::RequestFailed`] if the check request fails
452    ///
453    pub async fn check(
454        &self,
455        tuple_key: impl Into<CheckRequestTupleKey>,
456        contextual_tuples: impl Into<Option<Vec<TupleKey>>>,
457        context: impl Into<Option<prost_wkt_types::Struct>>,
458        trace: bool,
459    ) -> Result<bool> {
460        let contextual_tuples = contextual_tuples
461            .into()
462            .and_then(|c| (!c.is_empty()).then_some(c))
463            .map(|tuple_keys| ContextualTupleKeys { tuple_keys });
464
465        let check_request = CheckRequest {
466            store_id: self.store_id().to_string(),
467            tuple_key: Some(tuple_key.into()),
468            consistency: self.consistency().into(),
469            contextual_tuples,
470            authorization_model_id: self.authorization_model_id().to_string(),
471            context: context.into(),
472            trace,
473        };
474        let response = self
475            .client
476            .clone()
477            .check(check_request.clone())
478            .await
479            .map_err(|e| {
480                let check_request_debug = format!("{check_request:?}");
481                tracing::error!(
482                    "Check request failed with status {e}. Request: {check_request_debug}"
483                );
484                Error::RequestFailed(Box::new(e))
485            })?;
486        Ok(response.get_ref().allowed)
487    }
488
489    /// Check multiple tuples at once.
490    /// Returned `HashMap` contains one key for each `correlation_id` in the input.
491    ///
492    /// # Errors
493    /// * [`Error::RequestFailed`] if the check request fails
494    /// * [`Error::ExpectedOneof`] if the server unexpectedly returns `None` for one of the tuples
495    ///   to check.
496    pub async fn batch_check<I>(
497        &self,
498        checks: impl IntoIterator<Item = I>,
499    ) -> Result<HashMap<String, CheckResult>>
500    where
501        I: Into<BatchCheckItem>,
502    {
503        let checks: Vec<BatchCheckItem> = checks.into_iter().map(Into::into).collect();
504        let request = BatchCheckRequest {
505            store_id: self.store_id().to_string(),
506            checks,
507            authorization_model_id: self.authorization_model_id().to_string(),
508            consistency: self.consistency().into(),
509        };
510
511        let response = self
512            .client
513            .clone()
514            .batch_check(request.clone())
515            .await
516            .map_err(|e| {
517                let request_debug = format!("{request:?}");
518                tracing::error!(
519                    "Batch-Check request failed with status {e}. Request: {request_debug}"
520                );
521                Error::RequestFailed(Box::new(e))
522            })?;
523
524        let mut map = HashMap::new();
525        for (k, v) in response.into_inner().result {
526            match v.check_result {
527                // The server should return `Some(_)` for every tuple to check.
528                // `None` is not expected to occur, hence returning an error for the *entire*
529                // batch request to keep the API simple.
530                Some(v) => map.insert(k, v),
531                None => return Err(Error::ExpectedOneof),
532            };
533        }
534        Ok(map)
535    }
536
537    /// Expand all relationships in userset tree format.
538    /// Useful to reason about and debug a certain relationship.
539    ///
540    /// # Errors
541    /// * [`Error::RequestFailed`] if the expand request fails
542    ///
543    pub async fn expand(
544        &self,
545        tuple_key: impl Into<ExpandRequestTupleKey>,
546        contextual_tuples: impl Into<Option<Vec<TupleKey>>>,
547    ) -> Result<Option<UsersetTree>> {
548        let expand_request = ExpandRequest {
549            store_id: self.store_id().to_string(),
550            tuple_key: Some(tuple_key.into()),
551            authorization_model_id: self.authorization_model_id().to_string(),
552            consistency: self.consistency().into(),
553            contextual_tuples: contextual_tuples
554                .into()
555                .map(|tuple_keys| ContextualTupleKeys { tuple_keys }),
556        };
557        let response = self
558            .client
559            .clone()
560            .expand(expand_request.clone())
561            .await
562            .map_err(|e| {
563                tracing::error!(
564                    "Expand request failed with status {e}. Request: {expand_request:?}"
565                );
566                Error::RequestFailed(Box::new(e))
567            })?;
568        Ok(response.into_inner().tree)
569    }
570
571    /// Simplified version of [`Self::check`] without contextual tuples, context, or trace.
572    ///
573    /// # Errors
574    /// Check the [`Self::check`] method for possible errors.
575    pub async fn check_simple(&self, tuple_key: impl Into<CheckRequestTupleKey>) -> Result<bool> {
576        self.check(tuple_key, None, None, false).await
577    }
578
579    /// List all objects of the given type that the user has a relation with.
580    ///
581    /// # Errors
582    /// * [`Error::RequestFailed`] if the list-objects request fails
583    pub async fn list_objects(
584        &self,
585        r#type: impl Into<String>,
586        relation: impl Into<String>,
587        user: impl Into<String>,
588        contextual_tuples: impl Into<Option<Vec<TupleKey>>>,
589        context: impl Into<Option<prost_wkt_types::Struct>>,
590    ) -> Result<tonic::Response<ListObjectsResponse>> {
591        let request = ListObjectsRequest {
592            r#type: r#type.into(),
593            relation: relation.into(),
594            user: user.into(),
595            authorization_model_id: self.authorization_model_id().to_string(),
596            store_id: self.store_id().to_string(),
597            consistency: self.consistency().into(),
598            contextual_tuples: contextual_tuples
599                .into()
600                .map(|tuple_keys| ContextualTupleKeys { tuple_keys }),
601            context: context.into(),
602        };
603
604        self.client
605            .clone()
606            .list_objects(request.clone())
607            .await
608            .map_err(|e| {
609                tracing::error!(
610                    "List-Objects request failed with status {e}. Request: {request:?}"
611                );
612                Error::RequestFailed(Box::new(e))
613            })
614    }
615
616    /// Delete all relations that other entities have to the given `object`, that
617    /// is, all tuples with the "object" field set to the given `object`.
618    ///
619    /// This method uses streamed pagination internally, so that also large amounts of tuples can be deleted.
620    /// Please not that this method does not delete tuples where the given object has a relation TO another entity.
621    ///
622    /// Iteration is stopped when no more tuples are returned from OpenFGA.
623    ///
624    /// # Errors
625    /// * [`Error::RequestFailed`] if a read or delete request fails
626    ///
627    pub async fn delete_relations_to_object(&self, object: &str) -> Result<()> {
628        loop {
629            self.delete_relations_to_object_inner(object)
630                .await
631                .inspect_err(|e| {
632                    tracing::error!("Failed to delete relations to object {object}: {e}");
633                })?;
634
635            if self.exists_relation_to(object).await? {
636                tracing::debug!(
637                    "Some tuples for object {object} are still present after first sweep. Performing another deletion."
638                );
639            } else {
640                tracing::debug!("Successfully deleted all relations to object {object}");
641                break Ok(());
642            }
643        }
644    }
645
646    /// Check if any direct relation to the given object exists.
647    /// This does not check if the object is used as a user in relations to other objects.
648    ///
649    /// # Errors
650    /// * [`Error::RequestFailed`] if the read request fails
651    pub async fn exists_relation_to(&self, object: &str) -> Result<bool> {
652        let tuples = self.read_relations_to_object(object, None, 1).await?;
653        Ok(!tuples.tuples.is_empty())
654    }
655
656    async fn read_relations_to_object(
657        &self,
658        object: &str,
659        continuation_token: impl Into<Option<String>>,
660        page_size: i32,
661    ) -> Result<ReadResponse> {
662        self.read(
663            page_size,
664            TupleKeyWithoutCondition {
665                user: String::new(),
666                relation: String::new(),
667                object: object.to_string(),
668            },
669            continuation_token,
670        )
671        .await
672        .map(tonic::Response::into_inner)
673    }
674
675    /// # Errors
676    /// * [`Error::RequestFailed`] if a read or delete request fails
677    ///
678    async fn delete_relations_to_object_inner(&self, object: &str) -> Result<()> {
679        let read_stream = stream! {
680            let mut continuation_token = None;
681            // We need to keep track of seen keys, as OpenFGA might return
682            // duplicates even of `HigherConsistency`.
683            let mut seen= HashSet::new();
684            while continuation_token != Some(String::new()) {
685                let response = self.read_relations_to_object(object, continuation_token, self.max_tuples_per_write()).await?;
686                let keys = response.tuples.into_iter().filter_map(|t| t.key).filter(|k| !seen.contains(&(k.user.clone(), k.relation.clone()))).collect::<Vec<_>>();
687                tracing::debug!("Read {} keys for object {object} that are up for deletion. Continuation token: {}", keys.len(), response.continuation_token);
688                continuation_token = Some(response.continuation_token);
689                seen.extend(keys.iter().map(|k| (k.user.clone(), k.relation.clone())));
690                yield Result::Ok(keys);
691            }
692        };
693        pin_mut!(read_stream);
694        let mut read_tuples: Option<Vec<TupleKey>> = None;
695
696        let delete_tuples = |t: Option<Vec<TupleKey>>| async {
697            match t {
698                Some(tuples) => {
699                    tracing::debug!(
700                        "Deleting {} tuples for object {object} that we haven't seen before.",
701                        tuples.len()
702                    );
703                    self.write(
704                        None,
705                        Some(
706                            tuples
707                                .into_iter()
708                                .map(|t| TupleKeyWithoutCondition {
709                                    user: t.user,
710                                    relation: t.relation,
711                                    object: t.object,
712                                })
713                                .collect(),
714                        ),
715                    )
716                    .await
717                }
718                None => Ok(()),
719            }
720        };
721
722        loop {
723            let next_future = read_stream.next();
724            let deletion_future = delete_tuples(read_tuples.clone());
725
726            let (tuples, delete) = futures::join!(next_future, deletion_future);
727            delete?;
728
729            if let Some(tuples) = tuples.transpose()? {
730                read_tuples = (!tuples.is_empty()).then_some(tuples);
731            } else {
732                break Ok(());
733            }
734        }
735    }
736}
737
738#[cfg(test)]
739mod tests {
740    use needs_env_var::needs_env_var;
741
742    #[needs_env_var(TEST_OPENFGA_CLIENT_GRPC_URL)]
743    mod openfga {
744        use tracing_test::traced_test;
745
746        use super::super::*;
747        use crate::{
748            client::{AuthorizationModel, Store},
749            migration::test::openfga::service_client_with_store,
750        };
751
752        async fn write_custom_roles_model(
753            client: &OpenFgaServiceClient<tonic::transport::Channel>,
754            store: &Store,
755        ) -> String {
756            let model: AuthorizationModel = serde_json::from_str(include_str!(
757                "../tests/sample-store/custom-roles/schema.json"
758            ))
759            .unwrap();
760            client
761                .clone()
762                .write_authorization_model(model.into_write_request(store.id.clone()))
763                .await
764                .unwrap()
765                .into_inner()
766                .authorization_model_id
767        }
768
769        async fn get_client_with_custom_roles_model() -> OpenFgaClient<tonic::transport::Channel> {
770            let (service_client, store) = service_client_with_store().await;
771            let auth_model_id = write_custom_roles_model(&service_client, &store).await;
772
773            OpenFgaClient::new(service_client, &store.id, auth_model_id.as_str())
774        }
775
776        /// Verifies that the single-page [`OpenFgaClient::read`] honours
777        /// `tuple_key=None` as "no filter" and returns store-wide tuples,
778        /// providing a continuation token when more pages are available.
779        #[tokio::test]
780        #[traced_test]
781        async fn test_read_single_page_unfiltered() {
782            let client = get_client_with_custom_roles_model().await;
783
784            // Write 75 tuples — small enough to fit in one page of 100 but
785            // larger than a page of 50 so we also exercise the continuation token.
786            let total = 75;
787            for i in 0..total {
788                client
789                    .write(
790                        vec![TupleKey {
791                            user: format!("user:user{i}"),
792                            relation: "member".to_string(),
793                            object: "team:team1".to_string(),
794                            condition: None,
795                        }],
796                        None,
797                    )
798                    .await
799                    .unwrap();
800            }
801
802            // page_size=100: one page, no continuation, all 75 returned.
803            let resp = client
804                .read(100, None, None::<String>)
805                .await
806                .expect("read with None tuple_key must succeed");
807            let inner = resp.into_inner();
808            assert_eq!(inner.tuples.len(), total);
809            assert!(
810                inner.continuation_token.is_empty(),
811                "continuation token must be empty when all results fit in one page"
812            );
813
814            // page_size=50: first page returns 50, with a non-empty continuation.
815            let resp = client
816                .read(50, None, None::<String>)
817                .await
818                .expect("read with None tuple_key must succeed");
819            let inner = resp.into_inner();
820            assert_eq!(inner.tuples.len(), 50);
821            assert!(
822                !inner.continuation_token.is_empty(),
823                "continuation token must be set when more pages are available"
824            );
825
826            // Follow the continuation; remaining 25 tuples come back, no further pages.
827            let resp = client
828                .read(50, None, Some(inner.continuation_token))
829                .await
830                .expect("read with continuation token must succeed");
831            let inner = resp.into_inner();
832            assert_eq!(inner.tuples.len(), total - 50);
833            assert!(inner.continuation_token.is_empty());
834        }
835
836        /// Verifies that the bare-`ReadRequestTupleKey` call pattern (used
837        /// throughout this codebase before the signature was widened to accept
838        /// `Option<...>`) still compiles and works. Relies on the
839        /// `T: Into<Option<T>>` blanket impl.
840        #[tokio::test]
841        #[traced_test]
842        async fn test_read_single_page_filtered_backward_compat() {
843            let client = get_client_with_custom_roles_model().await;
844
845            client
846                .write(
847                    vec![
848                        TupleKey {
849                            user: "user:alice".to_string(),
850                            relation: "member".to_string(),
851                            object: "team:team1".to_string(),
852                            condition: None,
853                        },
854                        TupleKey {
855                            user: "user:bob".to_string(),
856                            relation: "member".to_string(),
857                            object: "team:team2".to_string(),
858                            condition: None,
859                        },
860                    ],
861                    None,
862                )
863                .await
864                .unwrap();
865
866            // Pass `ReadRequestTupleKey` directly — no `Some(...)` wrap.
867            let resp = client
868                .read(
869                    100,
870                    ReadRequestTupleKey {
871                        user: String::new(),
872                        relation: "member".to_string(),
873                        object: "team:team1".to_string(),
874                    },
875                    None::<String>,
876                )
877                .await
878                .unwrap();
879            let inner = resp.into_inner();
880            assert_eq!(inner.tuples.len(), 1);
881            assert_eq!(inner.tuples[0].key.as_ref().unwrap().user, "user:alice");
882        }
883
884        /// Verifies that all pages are read when *not* passing a `ReadRequestTupleKey`.
885        #[tokio::test]
886        #[traced_test]
887        async fn test_read_all_pages_empty_tuple() {
888            let client = get_client_with_custom_roles_model().await;
889
890            let loop_count = 100;
891            let tuples_per_loop = 3;
892            for i in 0..loop_count {
893                // Write to different relations with different users and objects to test that an
894                // empty ReadRequestTupleKey does not filter for anything.
895                client
896                    .write(
897                        vec![
898                            TupleKey {
899                                user: format!("user:user{i}"),
900                                relation: "member".to_string(),
901                                object: "team:team1".to_string(),
902                                condition: None,
903                            },
904                            TupleKey {
905                                user: format!("role:role{i}#assignee"),
906                                relation: "role_assigner".to_string(),
907                                object: "org:org1".to_string(),
908                                condition: None,
909                            },
910                            TupleKey {
911                                user: format!("org:org{i}"),
912                                relation: "org".to_string(),
913                                object: "asset-category:ac{i}".to_string(),
914                                condition: None,
915                            },
916                        ],
917                        None,
918                    )
919                    .await
920                    .unwrap();
921            }
922
923            let tuples = client
924                .read_all_pages(None::<ReadRequestTupleKey>, 50, u32::MAX)
925                .await
926                .unwrap();
927            assert_eq!(tuples.len(), loop_count * tuples_per_loop);
928        }
929
930        #[tokio::test]
931        #[traced_test]
932        async fn test_delete_relations_to_object() {
933            let client = get_client_with_custom_roles_model().await;
934            let object = "team:team1";
935
936            assert!(!client.exists_relation_to(object).await.unwrap());
937
938            client
939                .write(
940                    vec![TupleKey {
941                        user: "user:user1".to_string(),
942                        relation: "member".to_string(),
943                        object: object.to_string(),
944                        condition: None,
945                    }],
946                    None,
947                )
948                .await
949                .unwrap();
950            assert!(client.exists_relation_to(object).await.unwrap());
951            client.delete_relations_to_object(object).await.unwrap();
952            assert!(!client.exists_relation_to(object).await.unwrap());
953        }
954
955        #[tokio::test]
956        #[traced_test]
957        async fn test_delete_relations_to_object_usersets() {
958            let client = get_client_with_custom_roles_model().await;
959            let object: &str = "role:admin";
960
961            assert!(!client.exists_relation_to(object).await.unwrap());
962
963            client
964                .write(
965                    vec![TupleKey {
966                        user: "team:team1#member".to_string(),
967                        relation: "assignee".to_string(),
968                        object: object.to_string(),
969                        condition: None,
970                    }],
971                    None,
972                )
973                .await
974                .unwrap();
975            assert!(client.exists_relation_to(object).await.unwrap());
976            client.delete_relations_to_object(object).await.unwrap();
977            assert!(!client.exists_relation_to(object).await.unwrap());
978        }
979
980        #[tokio::test]
981        #[traced_test]
982        async fn test_delete_relations_to_object_empty() {
983            let client = get_client_with_custom_roles_model().await;
984            let object = "team:team1";
985
986            assert!(!client.exists_relation_to(object).await.unwrap());
987            client.delete_relations_to_object(object).await.unwrap();
988            assert!(!client.exists_relation_to(object).await.unwrap());
989        }
990
991        #[tokio::test]
992        #[traced_test]
993        async fn test_delete_relations_to_object_many() {
994            let client = get_client_with_custom_roles_model().await;
995            let object = "org:org1";
996
997            assert!(!client.exists_relation_to(object).await.unwrap());
998
999            for i in 0..502 {
1000                client
1001                    .write(
1002                        vec![
1003                            TupleKey {
1004                                user: format!("user:user{i}"),
1005                                relation: "member".to_string(),
1006                                object: object.to_string(),
1007                                condition: None,
1008                            },
1009                            TupleKey {
1010                                user: format!("role:role{i}#assignee"),
1011                                relation: "role_assigner".to_string(),
1012                                object: object.to_string(),
1013                                condition: None,
1014                            },
1015                        ],
1016                        None,
1017                    )
1018                    .await
1019                    .unwrap();
1020            }
1021
1022            // Also write a tuple for another org to make sure we don't delete those
1023            let object_2 = "org:org2";
1024            client
1025                .write(
1026                    vec![TupleKey {
1027                        user: "user:user1".to_string(),
1028                        relation: "owner".to_string(),
1029                        object: object_2.to_string(),
1030                        condition: None,
1031                    }],
1032                    None,
1033                )
1034                .await
1035                .unwrap();
1036
1037            assert!(client.exists_relation_to(object).await.unwrap());
1038            assert!(client.exists_relation_to(object_2).await.unwrap());
1039
1040            client.delete_relations_to_object(object).await.unwrap();
1041
1042            assert!(!client.exists_relation_to(object).await.unwrap());
1043            assert!(client.exists_relation_to(object_2).await.unwrap());
1044            assert!(
1045                client
1046                    .check_simple(TupleKeyWithoutCondition {
1047                        user: "user:user1".to_string(),
1048                        relation: "role_assigner".to_string(),
1049                        object: object_2.to_string(),
1050                    })
1051                    .await
1052                    .unwrap()
1053            );
1054        }
1055
1056        #[tokio::test]
1057        #[traced_test]
1058        async fn test_write_with_options_ignore_duplicate() {
1059            let client = get_client_with_custom_roles_model().await;
1060            let tuple = TupleKey {
1061                user: "user:user1".to_string(),
1062                relation: "member".to_string(),
1063                object: "team:team1".to_string(),
1064                condition: None,
1065            };
1066
1067            // First write should succeed
1068            client
1069                .write_with_options(vec![tuple.clone()], None, WriteOptions::default())
1070                .await
1071                .unwrap();
1072
1073            // Second write with default options should fail
1074            let result = client
1075                .write_with_options(vec![tuple.clone()], None, WriteOptions::default())
1076                .await;
1077            assert!(result.is_err());
1078
1079            // Write with ignore duplicate should succeed
1080            let options = WriteOptions {
1081                on_duplicate: ConflictBehavior::Ignore,
1082                on_missing: ConflictBehavior::Fail,
1083            };
1084            client
1085                .write_with_options(vec![tuple], None, options)
1086                .await
1087                .unwrap();
1088        }
1089
1090        #[tokio::test]
1091        #[traced_test]
1092        async fn test_write_with_options_ignore_missing() {
1093            let client = get_client_with_custom_roles_model().await;
1094            let tuple_key = TupleKeyWithoutCondition {
1095                user: "user:user1".to_string(),
1096                relation: "member".to_string(),
1097                object: "team:team1".to_string(),
1098            };
1099
1100            // Delete non-existent tuple with default options should fail
1101            let result = client
1102                .write_with_options(None, vec![tuple_key.clone()], WriteOptions::default())
1103                .await;
1104            assert!(result.is_err());
1105
1106            // Delete with ignore missing should succeed
1107            let options = WriteOptions {
1108                on_duplicate: ConflictBehavior::Fail,
1109                on_missing: ConflictBehavior::Ignore,
1110            };
1111            client
1112                .write_with_options(None, vec![tuple_key], options)
1113                .await
1114                .unwrap();
1115        }
1116
1117        #[tokio::test]
1118        #[traced_test]
1119        async fn test_write_with_options_idempotent() {
1120            let client = get_client_with_custom_roles_model().await;
1121            let tuple = TupleKey {
1122                user: "user:user1".to_string(),
1123                relation: "member".to_string(),
1124                object: "team:team1".to_string(),
1125                condition: None,
1126            };
1127
1128            let options = WriteOptions::new_idempotent();
1129
1130            // Write twice with idempotent options should succeed both times
1131            client
1132                .write_with_options(vec![tuple.clone()], None, options)
1133                .await
1134                .unwrap();
1135            client
1136                .write_with_options(vec![tuple], None, options)
1137                .await
1138                .unwrap();
1139
1140            // Delete non-existent tuple with idempotent options should succeed
1141            let tuple_key = TupleKeyWithoutCondition {
1142                user: "user:nonexistent".to_string(),
1143                relation: "member".to_string(),
1144                object: "team:team1".to_string(),
1145            };
1146            client
1147                .write_with_options(None, vec![tuple_key], options)
1148                .await
1149                .unwrap();
1150        }
1151
1152        #[tokio::test]
1153        #[traced_test]
1154        #[allow(clippy::similar_names)]
1155        async fn test_write_with_options_mixed_operations() {
1156            let client = get_client_with_custom_roles_model().await;
1157
1158            // First, write a tuple
1159            let tuple1 = TupleKey {
1160                user: "user:user1".to_string(),
1161                relation: "member".to_string(),
1162                object: "team:team1".to_string(),
1163                condition: None,
1164            };
1165            client.write(vec![tuple1.clone()], None).await.unwrap();
1166
1167            // Now write a new tuple and delete the existing one in the same request
1168            let tuple2 = TupleKey {
1169                user: "user:user2".to_string(),
1170                relation: "member".to_string(),
1171                object: "team:team1".to_string(),
1172                condition: None,
1173            };
1174            let delete_key = TupleKeyWithoutCondition {
1175                user: tuple1.user,
1176                relation: tuple1.relation,
1177                object: tuple1.object,
1178            };
1179
1180            client
1181                .write_with_options(vec![tuple2], vec![delete_key], WriteOptions::default())
1182                .await
1183                .unwrap();
1184
1185            // Verify the old tuple is gone and the new one exists
1186            let tuples = client
1187                .read_all_pages(
1188                    Some(TupleKeyWithoutCondition {
1189                        user: String::new(),
1190                        relation: "member".to_string(),
1191                        object: "team:team1".to_string(),
1192                    }),
1193                    10,
1194                    10,
1195                )
1196                .await
1197                .unwrap();
1198            assert_eq!(tuples.len(), 1);
1199            assert_eq!(tuples[0].key.as_ref().unwrap().user, "user:user2");
1200        }
1201
1202        #[tokio::test]
1203        #[traced_test]
1204        async fn test_write_with_options_empty_operations() {
1205            let client = get_client_with_custom_roles_model().await;
1206
1207            // Writing with empty writes and deletes should succeed without making a request
1208            let result = client
1209                .write_with_options(
1210                    None::<Vec<TupleKey>>,
1211                    None::<Vec<TupleKeyWithoutCondition>>,
1212                    WriteOptions::default(),
1213                )
1214                .await;
1215            assert!(result.is_ok());
1216        }
1217    }
1218}