Skip to main content

spicedb_embedded/
spicedb.rs

1//! FFI bindings for the `SpiceDB` C-shared library (CGO).
2//!
3//! This module provides a thin wrapper that starts `SpiceDB` via FFI and
4//! connects tonic-generated gRPC clients over a Unix socket (Unix) or TCP (Windows).
5//! Schema and relationships are written via gRPC (not JSON).
6
7use serde::{Deserialize, Serialize};
8use spicedb_embedded_sys::{dispose, start};
9use spicedb_grpc_tonic::v1::{
10    RelationshipUpdate, WriteRelationshipsRequest, WriteSchemaRequest,
11    relationship_update::Operation,
12};
13
14use crate::SpiceDBError;
15
16/// Response from the C library (JSON parsed)
17#[derive(Debug, Deserialize)]
18struct CResponse {
19    success: bool,
20    error: Option<String>,
21    data: Option<serde_json::Value>,
22}
23
24/// Options for starting an embedded `SpiceDB` instance.
25#[derive(Debug, Default, Clone, Serialize)]
26#[serde(rename_all = "snake_case")]
27pub struct StartOptions {
28    /// Datastore: "memory" (default), "postgres", "cockroachdb", "spanner", "mysql"
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub datastore: Option<String>,
31    /// Connection string for remote datastores. Required for postgres, cockroachdb, spanner, mysql.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub datastore_uri: Option<String>,
34    /// Path to Spanner service account JSON (Spanner only)
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub spanner_credentials_file: Option<String>,
37    /// Spanner emulator host, e.g. "localhost:9010" (Spanner only)
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub spanner_emulator_host: Option<String>,
40    /// Prefix for all tables (`MySQL` only)
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub mysql_table_prefix: Option<String>,
43    /// Primary switch for all metrics and tracing (default: false).
44    /// When false, all other observability options are ignored.
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub metrics_enabled: Option<bool>,
47    /// Enable datastore Prometheus metrics (default: true when `metrics_enabled=true`).
48    /// Only takes effect when `metrics_enabled=true`.
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub datastore_metrics_enabled: Option<bool>,
51    /// Enable cache Prometheus metrics for dispatch/namespace/cluster caches
52    /// (default: true when `metrics_enabled=true`).
53    /// Only takes effect when `metrics_enabled=true`.
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub cache_metrics_enabled: Option<bool>,
56    /// OTLP gRPC endpoint for OpenTelemetry traces, e.g. `"localhost:4317"` (insecure).
57    /// Only used when `metrics_enabled=true`.
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub otlp_endpoint: Option<String>,
60    /// If set, starts a Prometheus HTTP server on this port at `/metrics`.
61    /// Only used when `metrics_enabled=true`.
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub metrics_port: Option<u16>,
64    /// Host/IP the Prometheus HTTP server binds to (default: `"0.0.0.0"`).
65    /// Only used when `metrics_enabled=true` and `metrics_port` is set.
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub metrics_host: Option<String>,
68}
69
70/// Parses the JSON response string from the C library (start/dispose) into the inner data or error.
71fn parse_json_response(response_str: &str) -> Result<serde_json::Value, SpiceDBError> {
72    let response: CResponse = serde_json::from_str(response_str)
73        .map_err(|e| SpiceDBError::Protocol(format!("invalid JSON: {e} (raw: {response_str})")))?;
74
75    if response.success {
76        Ok(response.data.unwrap_or(serde_json::Value::Null))
77    } else {
78        Err(SpiceDBError::SpiceDB(
79            response.error.unwrap_or_else(|| "unknown error".into()),
80        ))
81    }
82}
83
84use spicedb_embedded_sys::memory_transport;
85use spicedb_grpc_tonic::v1::{
86    CheckBulkPermissionsRequest, CheckBulkPermissionsResponse, CheckPermissionRequest,
87    CheckPermissionResponse, DeleteRelationshipsRequest, DeleteRelationshipsResponse,
88    ExpandPermissionTreeRequest, ExpandPermissionTreeResponse, ReadSchemaRequest,
89    ReadSchemaResponse, WriteRelationshipsResponse, WriteSchemaResponse,
90};
91
92/// Embedded `SpiceDB` instance (in-memory transport). All RPCs go through the FFI.
93/// For streaming APIs (`Watch`, `ReadRelationships`, etc.) use [`streaming_address`](EmbeddedSpiceDB::streaming_address)
94/// (the C library starts a streaming proxy and returns it in the start response).
95pub struct EmbeddedSpiceDB {
96    handle: u64,
97    /// Set from the C library start response; use for `Watch`, `ReadRelationships`, `LookupResources`, `LookupSubjects`.
98    streaming_address: String,
99    /// Set from the C library start response: "unix" or "tcp".
100    streaming_transport: String,
101}
102
103unsafe impl Send for EmbeddedSpiceDB {}
104unsafe impl Sync for EmbeddedSpiceDB {}
105
106impl EmbeddedSpiceDB {
107    /// Start an embedded `SpiceDB` instance without bootstrapping schema or relationships.
108    ///
109    /// Use this when you want to manage schema/relationships yourself, or when
110    /// connecting to a pre-existing datastore that already has a schema.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the C library fails to start or returns invalid JSON.
115    pub fn start(options: Option<&StartOptions>) -> Result<Self, SpiceDBError> {
116        let opts = options.cloned().unwrap_or_default();
117        let json = serde_json::to_string(&opts)
118            .map_err(|e| SpiceDBError::Protocol(format!("serialize options: {e}")))?;
119        let response_str = start(Some(&json)).map_err(SpiceDBError::Runtime)?;
120        let data = parse_json_response(&response_str)?;
121        let handle = data
122            .get("handle")
123            .and_then(serde_json::Value::as_u64)
124            .ok_or_else(|| SpiceDBError::Protocol("missing handle in start response".into()))?;
125        let streaming_address = data
126            .get("streaming_address")
127            .and_then(serde_json::Value::as_str)
128            .map(String::from)
129            .ok_or_else(|| {
130                SpiceDBError::Protocol("missing streaming_address in start response".into())
131            })?;
132        let streaming_transport = data
133            .get("streaming_transport")
134            .and_then(serde_json::Value::as_str)
135            .map(String::from)
136            .ok_or_else(|| {
137                SpiceDBError::Protocol("missing streaming_transport in start response".into())
138            })?;
139
140        Ok(Self {
141            handle,
142            streaming_address,
143            streaming_transport,
144        })
145    }
146
147    /// Start an embedded `SpiceDB` instance and bootstrap it with schema and relationships.
148    ///
149    /// If `schema` is non-empty, writes it via `SchemaService`. If `relationships` is non-empty, writes them via `WriteRelationships`.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the C library fails to start, returns invalid JSON, or schema/relationship write fails.
154    pub fn start_with_schema(
155        schema: &str,
156        relationships: &[spicedb_grpc_tonic::v1::Relationship],
157        options: Option<&StartOptions>,
158    ) -> Result<Self, SpiceDBError> {
159        let db = Self::start(options)?;
160
161        if !schema.is_empty() {
162            memory_transport::write_schema(
163                db.handle,
164                &WriteSchemaRequest {
165                    schema: schema.to_string(),
166                },
167            )
168            .map_err(|e| SpiceDBError::SpiceDB(e.0))?;
169        }
170
171        if !relationships.is_empty() {
172            let updates: Vec<RelationshipUpdate> = relationships
173                .iter()
174                .map(|r| RelationshipUpdate {
175                    operation: Operation::Touch as i32,
176                    relationship: Some(r.clone()),
177                })
178                .collect();
179            memory_transport::write_relationships(
180                db.handle,
181                &WriteRelationshipsRequest {
182                    updates,
183                    optional_preconditions: vec![],
184                    optional_transaction_metadata: None,
185                },
186            )
187            .map_err(|e| SpiceDBError::SpiceDB(e.0))?;
188        }
189
190        Ok(db)
191    }
192
193    /// Deprecated: use [`start_with_schema`](Self::start_with_schema) instead.
194    ///
195    /// # Errors
196    ///
197    /// See [`start_with_schema`](Self::start_with_schema).
198    #[deprecated(since = "0.6.0", note = "renamed to start_with_schema")]
199    pub fn new(
200        schema: &str,
201        relationships: &[spicedb_grpc_tonic::v1::Relationship],
202        options: Option<&StartOptions>,
203    ) -> Result<Self, SpiceDBError> {
204        Self::start_with_schema(schema, relationships, options)
205    }
206
207    /// Permissions service (`CheckPermission`, `WriteRelationships`, `DeleteRelationships`, etc.).
208    #[must_use]
209    pub const fn permissions(&self) -> MemoryPermissionsClient {
210        MemoryPermissionsClient {
211            handle: self.handle,
212        }
213    }
214
215    /// Schema service (`ReadSchema`, `WriteSchema`).
216    #[must_use]
217    pub const fn schema(&self) -> MemorySchemaClient {
218        MemorySchemaClient {
219            handle: self.handle,
220        }
221    }
222
223    /// Raw handle for advanced use (e.g. with [`spicedb_embedded_sys::memory_transport`]).
224    #[must_use]
225    pub const fn handle(&self) -> u64 {
226        self.handle
227    }
228
229    /// Returns the address for streaming APIs (`Watch`, `ReadRelationships`, `LookupResources`, `LookupSubjects`).
230    /// Set from the C library start response when the streaming proxy was started.
231    #[must_use]
232    pub fn streaming_address(&self) -> &str {
233        &self.streaming_address
234    }
235
236    /// Streaming proxy transport: "unix" or "tcp" (from C library start response).
237    #[must_use]
238    pub fn streaming_transport(&self) -> &str {
239        &self.streaming_transport
240    }
241}
242
243impl Drop for EmbeddedSpiceDB {
244    fn drop(&mut self) {
245        let _ = dispose(self.handle);
246    }
247}
248
249/// Permissions service client for memory transport. All methods are synchronous and use the -sys safe layer.
250pub struct MemoryPermissionsClient {
251    handle: u64,
252}
253
254impl MemoryPermissionsClient {
255    /// `CheckPermission`.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if the FFI call fails or the response cannot be decoded.
260    pub fn check_permission(
261        &self,
262        request: &CheckPermissionRequest,
263    ) -> Result<CheckPermissionResponse, SpiceDBError> {
264        memory_transport::check_permission(self.handle, request)
265            .map_err(|e| SpiceDBError::SpiceDB(e.0))
266    }
267
268    /// `WriteRelationships`.
269    ///
270    /// # Errors
271    ///
272    /// Returns an error if the FFI call fails or the response cannot be decoded.
273    pub fn write_relationships(
274        &self,
275        request: &WriteRelationshipsRequest,
276    ) -> Result<WriteRelationshipsResponse, SpiceDBError> {
277        memory_transport::write_relationships(self.handle, request)
278            .map_err(|e| SpiceDBError::SpiceDB(e.0))
279    }
280
281    /// `DeleteRelationships`.
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if the FFI call fails or the response cannot be decoded.
286    pub fn delete_relationships(
287        &self,
288        request: &DeleteRelationshipsRequest,
289    ) -> Result<DeleteRelationshipsResponse, SpiceDBError> {
290        memory_transport::delete_relationships(self.handle, request)
291            .map_err(|e| SpiceDBError::SpiceDB(e.0))
292    }
293
294    /// `CheckBulkPermissions`.
295    ///
296    /// # Errors
297    ///
298    /// Returns an error if the FFI call fails or the response cannot be decoded.
299    pub fn check_bulk_permissions(
300        &self,
301        request: &CheckBulkPermissionsRequest,
302    ) -> Result<CheckBulkPermissionsResponse, SpiceDBError> {
303        memory_transport::check_bulk_permissions(self.handle, request)
304            .map_err(|e| SpiceDBError::SpiceDB(e.0))
305    }
306
307    /// `ExpandPermissionTree`.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if the FFI call fails or the response cannot be decoded.
312    pub fn expand_permission_tree(
313        &self,
314        request: &ExpandPermissionTreeRequest,
315    ) -> Result<ExpandPermissionTreeResponse, SpiceDBError> {
316        memory_transport::expand_permission_tree(self.handle, request)
317            .map_err(|e| SpiceDBError::SpiceDB(e.0))
318    }
319}
320
321/// Schema service client for memory transport.
322pub struct MemorySchemaClient {
323    handle: u64,
324}
325
326impl MemorySchemaClient {
327    /// `ReadSchema`.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if the FFI call fails or the response cannot be decoded.
332    pub fn read_schema(
333        &self,
334        request: &ReadSchemaRequest,
335    ) -> Result<ReadSchemaResponse, SpiceDBError> {
336        memory_transport::read_schema(self.handle, request).map_err(|e| SpiceDBError::SpiceDB(e.0))
337    }
338
339    /// `WriteSchema`.
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if the FFI call fails or the response cannot be decoded.
344    pub fn write_schema(
345        &self,
346        request: &WriteSchemaRequest,
347    ) -> Result<WriteSchemaResponse, SpiceDBError> {
348        memory_transport::write_schema(self.handle, request).map_err(|e| SpiceDBError::SpiceDB(e.0))
349    }
350}
351
352#[cfg(test)]
353mod tests {
354
355    use std::time::Duration;
356
357    use spicedb_grpc_tonic::v1::{
358        CheckPermissionRequest, Consistency, ObjectReference, ReadRelationshipsRequest,
359        Relationship, RelationshipFilter, RelationshipUpdate, SubjectReference, WatchKind,
360        WatchRequest, WriteRelationshipsRequest, relationship_update::Operation,
361        watch_service_client::WatchServiceClient,
362    };
363    use tokio::time::timeout;
364    use tokio_stream::StreamExt;
365    use tonic::transport::{Channel, Endpoint};
366
367    use super::*;
368    use crate::v1::check_permission_response::Permissionship;
369
370    /// Connect to the streaming proxy (addr + transport from C library start response).
371    #[allow(unused_variables)] // transport only used on Unix
372    async fn connect_streaming(
373        addr: &str,
374        transport: &str,
375    ) -> Result<Channel, Box<dyn std::error::Error + Send + Sync>> {
376        #[cfg(unix)]
377        {
378            if transport == "unix" {
379                let path = addr.to_string();
380                Endpoint::try_from("http://[::]:50051")?
381                    .connect_with_connector(tower::service_fn(move |_: tonic::transport::Uri| {
382                        let path = path.clone();
383                        async move {
384                            let stream = tokio::net::UnixStream::connect(&path).await?;
385                            Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream))
386                        }
387                    }))
388                    .await
389                    .map_err(Into::into)
390            } else {
391                Endpoint::from_shared(format!("http://{addr}"))?
392                    .connect()
393                    .await
394                    .map_err(Into::into)
395            }
396        }
397        #[cfg(windows)]
398        {
399            Endpoint::from_shared(format!("http://{addr}"))?
400                .connect()
401                .await
402                .map_err(Into::into)
403        }
404    }
405
406    const TEST_SCHEMA: &str = r"
407definition user {}
408
409definition document {
410    relation reader: user
411    relation writer: user
412
413    permission read = reader + writer
414    permission write = writer
415}
416";
417
418    fn rel(resource: &str, relation: &str, subject: &str) -> Relationship {
419        let (res_type, res_id) = resource.split_once(':').unwrap();
420        let (sub_type, sub_id) = subject.split_once(':').unwrap();
421        Relationship {
422            resource: Some(ObjectReference {
423                object_type: res_type.into(),
424                object_id: res_id.into(),
425            }),
426            relation: relation.into(),
427            subject: Some(SubjectReference {
428                object: Some(ObjectReference {
429                    object_type: sub_type.into(),
430                    object_id: sub_id.into(),
431                }),
432                optional_relation: String::new(),
433            }),
434            optional_caveat: None,
435            optional_expires_at: None,
436        }
437    }
438
439    fn fully_consistent() -> Consistency {
440        Consistency {
441            requirement: Some(crate::v1::consistency::Requirement::FullyConsistent(true)),
442        }
443    }
444
445    fn check_req(resource: &str, permission: &str, subject: &str) -> CheckPermissionRequest {
446        let (res_type, res_id) = resource.split_once(':').unwrap();
447        let (sub_type, sub_id) = subject.split_once(':').unwrap();
448        CheckPermissionRequest {
449            consistency: Some(fully_consistent()),
450            resource: Some(ObjectReference {
451                object_type: res_type.into(),
452                object_id: res_id.into(),
453            }),
454            permission: permission.into(),
455            subject: Some(SubjectReference {
456                object: Some(ObjectReference {
457                    object_type: sub_type.into(),
458                    object_id: sub_id.into(),
459                }),
460                optional_relation: String::new(),
461            }),
462            context: None,
463            with_tracing: false,
464        }
465    }
466
467    /// `EmbeddedSpiceDB::start_with_schema` + `.permissions().check_permission()`. Skipped on bind/streaming proxy errors.
468    #[test]
469    fn test_check_permission() {
470        let relationships = vec![
471            rel("document:readme", "reader", "user:alice"),
472            rel("document:readme", "writer", "user:bob"),
473        ];
474        let spicedb = match EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &relationships, None) {
475            Ok(db) => db,
476            Err(e) => {
477                let msg = e.to_string();
478                if msg.contains("streaming proxy")
479                    || msg.contains("bind")
480                    || msg.contains("operation not permitted")
481                {
482                    return;
483                }
484                panic!("EmbeddedSpiceDB::start_with_schema failed: {e}");
485            }
486        };
487
488        let response = spicedb
489            .permissions()
490            .check_permission(&check_req("document:readme", "read", "user:alice"))
491            .unwrap();
492        assert_eq!(
493            response.permissionship,
494            Permissionship::HasPermission as i32,
495            "alice should have read on document:readme"
496        );
497    }
498
499    /// Verifies the streaming proxy: start Watch stream, write a relationship, receive update on stream. Skipped on bind/proxy errors.
500    #[test]
501    fn test_watch_streaming() {
502        let db = match EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None) {
503            Ok(d) => d,
504            Err(e) => {
505                let msg = e.to_string();
506                if msg.contains("streaming proxy")
507                    || msg.contains("bind")
508                    || msg.contains("operation not permitted")
509                {
510                    return;
511                }
512                panic!("EmbeddedSpiceDB::start_with_schema failed: {e}");
513            }
514        };
515
516        let rt = tokio::runtime::Runtime::new().unwrap();
517        rt.block_on(async {
518            let channel = connect_streaming(db.streaming_address(), db.streaming_transport())
519                .await
520                .unwrap();
521            let mut watch_client = WatchServiceClient::new(channel);
522            // Request checkpoints so the server sends an initial response and keeps the stream alive.
523            let watch_req = WatchRequest {
524                optional_update_kinds: vec![
525                    WatchKind::IncludeRelationshipUpdates.into(),
526                    WatchKind::IncludeCheckpoints.into(),
527                ],
528                ..Default::default()
529            };
530            let mut stream =
531                match timeout(Duration::from_secs(10), watch_client.watch(watch_req)).await {
532                    Ok(Ok(response)) => response.into_inner(),
533                    Ok(Err(e)) => panic!("watch() failed: {e}"),
534                    Err(_) => return,
535                };
536
537            let write_req = WriteRelationshipsRequest {
538                updates: vec![RelationshipUpdate {
539                    operation: Operation::Touch as i32,
540                    relationship: Some(rel("document:watched", "reader", "user:alice")),
541                }],
542                optional_preconditions: vec![],
543                optional_transaction_metadata: None,
544            };
545            db.permissions().write_relationships(&write_req).unwrap();
546
547            let mut received_update = false;
548            let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
549            while tokio::time::Instant::now() < deadline {
550                match timeout(Duration::from_millis(200), stream.next()).await {
551                    Ok(Some(Ok(resp))) => {
552                        if !resp.updates.is_empty() {
553                            received_update = true;
554                            break;
555                        }
556                    }
557                    Ok(Some(Err(e))) => panic!("watch stream error: {e}"),
558                    Ok(None) => break,
559                    Err(_) => {}
560                }
561            }
562            assert!(
563                received_update,
564                "expected at least one Watch response with updates within 3s"
565            );
566        });
567    }
568
569    #[test]
570    fn test_ffi_spicedb() {
571        let relationships = vec![
572            rel("document:readme", "reader", "user:alice"),
573            rel("document:readme", "writer", "user:bob"),
574        ];
575
576        let spicedb =
577            EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &relationships, None).unwrap();
578
579        assert_eq!(
580            spicedb
581                .permissions()
582                .check_permission(&check_req("document:readme", "read", "user:alice"))
583                .unwrap()
584                .permissionship,
585            Permissionship::HasPermission as i32,
586        );
587        assert_eq!(
588            spicedb
589                .permissions()
590                .check_permission(&check_req("document:readme", "write", "user:alice"))
591                .unwrap()
592                .permissionship,
593            Permissionship::NoPermission as i32,
594        );
595        assert_eq!(
596            spicedb
597                .permissions()
598                .check_permission(&check_req("document:readme", "read", "user:bob"))
599                .unwrap()
600                .permissionship,
601            Permissionship::HasPermission as i32,
602        );
603        assert_eq!(
604            spicedb
605                .permissions()
606                .check_permission(&check_req("document:readme", "write", "user:bob"))
607                .unwrap()
608                .permissionship,
609            Permissionship::HasPermission as i32,
610        );
611        assert_eq!(
612            spicedb
613                .permissions()
614                .check_permission(&check_req("document:readme", "read", "user:charlie"))
615                .unwrap()
616                .permissionship,
617            Permissionship::NoPermission as i32,
618        );
619    }
620
621    #[test]
622    fn test_add_relationship() {
623        let spicedb = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
624
625        spicedb
626            .permissions()
627            .write_relationships(&WriteRelationshipsRequest {
628                updates: vec![RelationshipUpdate {
629                    operation: Operation::Touch as i32,
630                    relationship: Some(rel("document:test", "reader", "user:alice")),
631                }],
632                optional_preconditions: vec![],
633                optional_transaction_metadata: None,
634            })
635            .unwrap();
636
637        let r = spicedb
638            .permissions()
639            .check_permission(&check_req("document:test", "read", "user:alice"))
640            .unwrap();
641        assert_eq!(r.permissionship, Permissionship::HasPermission as i32);
642    }
643
644    #[test]
645    fn test_parallel_instances() {
646        let spicedb1 = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
647        let spicedb2 = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
648
649        spicedb1
650            .permissions()
651            .write_relationships(&WriteRelationshipsRequest {
652                updates: vec![RelationshipUpdate {
653                    operation: Operation::Touch as i32,
654                    relationship: Some(rel("document:doc1", "reader", "user:alice")),
655                }],
656                optional_preconditions: vec![],
657                optional_transaction_metadata: None,
658            })
659            .unwrap();
660
661        let r1 = spicedb1
662            .permissions()
663            .check_permission(&check_req("document:doc1", "read", "user:alice"))
664            .unwrap();
665        assert_eq!(r1.permissionship, Permissionship::HasPermission as i32);
666
667        let r2 = spicedb2
668            .permissions()
669            .check_permission(&check_req("document:doc1", "read", "user:alice"))
670            .unwrap();
671        assert_eq!(r2.permissionship, Permissionship::NoPermission as i32);
672    }
673
674    #[tokio::test]
675    async fn test_read_relationships() {
676        let relationships = vec![
677            rel("document:doc1", "reader", "user:alice"),
678            rel("document:doc1", "reader", "user:bob"),
679        ];
680
681        let spicedb =
682            EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &relationships, None).unwrap();
683        let channel = connect_streaming(spicedb.streaming_address(), spicedb.streaming_transport())
684            .await
685            .unwrap();
686        let mut client =
687            spicedb_grpc_tonic::v1::permissions_service_client::PermissionsServiceClient::new(
688                channel,
689            );
690        let mut stream = client
691            .read_relationships(ReadRelationshipsRequest {
692                consistency: Some(fully_consistent()),
693                relationship_filter: Some(RelationshipFilter {
694                    resource_type: "document".into(),
695                    optional_resource_id: "doc1".into(),
696                    optional_resource_id_prefix: String::new(),
697                    optional_relation: String::new(),
698                    optional_subject_filter: None,
699                }),
700                optional_limit: 0,
701                optional_cursor: None,
702            })
703            .await
704            .unwrap()
705            .into_inner();
706
707        let mut count = 0;
708        while let Some(Ok(_)) = stream.next().await {
709            count += 1;
710        }
711        assert_eq!(count, 2);
712    }
713
714    /// Performance test - run with `cargo test perf_ -- --nocapture --ignored`
715    #[test]
716    #[ignore = "performance test - run manually with --ignored flag"]
717    fn perf_check_with_1000_relationships() {
718        const NUM_CHECKS: usize = 100;
719        use std::time::Instant;
720
721        // Create 1000 relationships
722        let relationships: Vec<Relationship> = (0..1000)
723            .map(|i| {
724                rel(
725                    &format!("document:doc{i}"),
726                    "reader",
727                    &format!("user:user{}", i % 100),
728                )
729            })
730            .collect();
731
732        println!("\n=== Performance: Check with 1000 relationships ===");
733
734        let start = Instant::now();
735        let spicedb =
736            EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &relationships, None).unwrap();
737        println!(
738            "Instance creation with 1000 relationships: {:?}",
739            start.elapsed()
740        );
741
742        // Warm up
743        for _ in 0..10 {
744            let _ = spicedb.permissions().check_permission(&check_req(
745                "document:doc0",
746                "read",
747                "user:user0",
748            ));
749        }
750
751        // Benchmark permission checks
752        let start = Instant::now();
753        for i in 0..NUM_CHECKS {
754            let doc = format!("document:doc{}", i % 1000);
755            let user = format!("user:user{}", i % 100);
756            let _ = spicedb
757                .permissions()
758                .check_permission(&check_req(&doc, "read", &user))
759                .unwrap();
760        }
761        let elapsed = start.elapsed();
762
763        let num_checks_u32 = u32::try_from(NUM_CHECKS).unwrap();
764        println!("Total time for {NUM_CHECKS} checks: {elapsed:?}");
765        println!("Average per check: {:?}", elapsed / num_checks_u32);
766        println!(
767            "Checks per second: {:.0}",
768            f64::from(num_checks_u32) / elapsed.as_secs_f64()
769        );
770    }
771
772    /// Performance test for individual relationship additions
773    #[test]
774    #[ignore = "performance test - run manually with --ignored flag"]
775    fn perf_add_individual_relationships() {
776        const NUM_ADDS: usize = 50;
777        use std::time::Instant;
778
779        let spicedb = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
780
781        println!("\n=== Performance: Add individual relationships ===");
782
783        let start = Instant::now();
784        for i in 0..NUM_ADDS {
785            spicedb
786                .permissions()
787                .write_relationships(&WriteRelationshipsRequest {
788                    updates: vec![RelationshipUpdate {
789                        operation: Operation::Touch as i32,
790                        relationship: Some(rel(
791                            &format!("document:perf{i}"),
792                            "reader",
793                            "user:alice",
794                        )),
795                    }],
796                    optional_preconditions: vec![],
797                    optional_transaction_metadata: None,
798                })
799                .unwrap();
800        }
801        let elapsed = start.elapsed();
802
803        let num_adds_u32 = u32::try_from(NUM_ADDS).unwrap();
804        println!("Total time for {NUM_ADDS} individual adds: {elapsed:?}");
805        println!("Average per add: {:?}", elapsed / num_adds_u32);
806        println!(
807            "Adds per second: {:.0}",
808            f64::from(num_adds_u32) / elapsed.as_secs_f64()
809        );
810    }
811
812    /// Performance test for bulk relationship writes
813    #[test]
814    #[ignore = "performance test - run manually with --ignored flag"]
815    fn perf_bulk_write_relationships() {
816        use std::time::Instant;
817
818        let spicedb = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
819
820        println!("\n=== Performance: Bulk write relationships ===");
821
822        // Test different batch sizes
823        for batch_size in [5_i32, 10, 20, 50] {
824            let batch_size_u32 = u32::try_from(batch_size).unwrap();
825            let relationships: Vec<Relationship> = (0..batch_size)
826                .map(|i| {
827                    rel(
828                        &format!("document:bulk{batch_size}_{i}"),
829                        "reader",
830                        "user:alice",
831                    )
832                })
833                .collect();
834
835            let start = Instant::now();
836            spicedb
837                .permissions()
838                .write_relationships(&WriteRelationshipsRequest {
839                    updates: relationships
840                        .iter()
841                        .map(|r| RelationshipUpdate {
842                            operation: Operation::Touch as i32,
843                            relationship: Some(r.clone()),
844                        })
845                        .collect(),
846                    optional_preconditions: vec![],
847                    optional_transaction_metadata: None,
848                })
849                .unwrap();
850            let elapsed = start.elapsed();
851
852            println!(
853                "Batch of {} relationships: {:?} ({:?} per relationship)",
854                batch_size,
855                elapsed,
856                elapsed / batch_size_u32
857            );
858        }
859
860        // Compare: 10 individual vs 10 bulk
861        println!("\n--- Comparison: 10 individual vs 10 bulk ---");
862
863        let spicedb2 = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
864
865        let start = Instant::now();
866        for i in 0..10 {
867            spicedb2
868                .permissions()
869                .write_relationships(&WriteRelationshipsRequest {
870                    updates: vec![RelationshipUpdate {
871                        operation: Operation::Touch as i32,
872                        relationship: Some(rel(
873                            &format!("document:cmp_ind{i}"),
874                            "reader",
875                            "user:bob",
876                        )),
877                    }],
878                    optional_preconditions: vec![],
879                    optional_transaction_metadata: None,
880                })
881                .unwrap();
882        }
883        let individual_time = start.elapsed();
884        println!("10 individual adds: {individual_time:?}");
885
886        let relationships: Vec<Relationship> = (0..10)
887            .map(|i| rel(&format!("document:cmp_bulk{i}"), "reader", "user:bob"))
888            .collect();
889        let start = Instant::now();
890        spicedb2
891            .permissions()
892            .write_relationships(&WriteRelationshipsRequest {
893                updates: relationships
894                    .iter()
895                    .map(|r| RelationshipUpdate {
896                        operation: Operation::Touch as i32,
897                        relationship: Some(r.clone()),
898                    })
899                    .collect(),
900                optional_preconditions: vec![],
901                optional_transaction_metadata: None,
902            })
903            .unwrap();
904        let bulk_time = start.elapsed();
905        println!("10 bulk add: {bulk_time:?}");
906        println!(
907            "Speedup: {:.1}x",
908            individual_time.as_secs_f64() / bulk_time.as_secs_f64()
909        );
910    }
911
912    /// Performance test: 50,000 relationships
913    #[test]
914    #[ignore = "performance test - run manually with --ignored flag"]
915    fn perf_embedded_50k_relationships() {
916        const TOTAL_RELS: usize = 50_000;
917        const BATCH_SIZE: usize = 1000;
918        const NUM_CHECKS: usize = 500;
919        use std::time::Instant;
920
921        println!("\n=== Embedded SpiceDB: 50,000 relationships ===");
922
923        // Create 50,000 relationships in batches (SpiceDB max batch is 1000)
924        println!("Creating instance...");
925        let start = Instant::now();
926        let spicedb = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
927        println!("Instance creation time: {:?}", start.elapsed());
928
929        println!("Adding {TOTAL_RELS} relationships in batches of {BATCH_SIZE}...");
930        let start = Instant::now();
931        for batch_num in 0..(TOTAL_RELS / BATCH_SIZE) {
932            let batch_start = batch_num * BATCH_SIZE;
933            let relationships: Vec<Relationship> = (batch_start..batch_start + BATCH_SIZE)
934                .map(|i| {
935                    rel(
936                        &format!("document:doc{i}"),
937                        "reader",
938                        &format!("user:user{}", i % 1000),
939                    )
940                })
941                .collect();
942            spicedb
943                .permissions()
944                .write_relationships(&WriteRelationshipsRequest {
945                    updates: relationships
946                        .iter()
947                        .map(|r| RelationshipUpdate {
948                            operation: Operation::Touch as i32,
949                            relationship: Some(r.clone()),
950                        })
951                        .collect(),
952                    optional_preconditions: vec![],
953                    optional_transaction_metadata: None,
954                })
955                .unwrap();
956        }
957        println!(
958            "Total time to add {} relationships: {:?}",
959            TOTAL_RELS,
960            start.elapsed()
961        );
962
963        // Warm up
964        for _ in 0..20 {
965            let _ = spicedb.permissions().check_permission(&check_req(
966                "document:doc0",
967                "read",
968                "user:user0",
969            ));
970        }
971
972        // Benchmark permission checks
973        let num_checks_u32 = u32::try_from(NUM_CHECKS).unwrap();
974        let start = Instant::now();
975        for i in 0..NUM_CHECKS {
976            let doc = format!("document:doc{}", i % TOTAL_RELS);
977            let user = format!("user:user{}", i % 1000);
978            let _ = spicedb
979                .permissions()
980                .check_permission(&check_req(&doc, "read", &user))
981                .unwrap();
982        }
983        let elapsed = start.elapsed();
984
985        println!("Total time for {NUM_CHECKS} checks: {elapsed:?}");
986        println!("Average per check: {:?}", elapsed / num_checks_u32);
987        println!(
988            "Checks per second: {:.0}",
989            f64::from(num_checks_u32) / elapsed.as_secs_f64()
990        );
991
992        // Test some negative checks too
993        let start = Instant::now();
994        for i in 0..NUM_CHECKS {
995            let doc = format!("document:doc{}", i % TOTAL_RELS);
996            // user:nonexistent doesn't exist
997            let _ = spicedb
998                .permissions()
999                .check_permission(&check_req(&doc, "read", "user:nonexistent"))
1000                .unwrap();
1001        }
1002        let elapsed = start.elapsed();
1003        println!("\nNegative checks (user not found):");
1004        println!("Average per check: {:?}", elapsed / num_checks_u32);
1005    }
1006
1007    /// Shared-datastore tests: two embedded servers using the same remote datastore.
1008    /// Run with: cargo test --ignored `datastore_shared`
1009    ///
1010    /// Requires: Docker (linux/amd64 images), and `spicedb` CLI in PATH for migrations.
1011    /// Only run on `x86_64`: amd64 containers fail on arm64 (exec format error / QEMU).
1012    #[cfg(all(not(target_os = "windows"), target_arch = "x86_64"))]
1013    mod datastore_shared {
1014        /// Platform for testcontainers: use Linux so images work on Windows Docker Desktop.
1015        const LINUX_AMD64: &str = "linux/amd64";
1016        use std::process::Command;
1017
1018        use testcontainers_modules::{
1019            cockroach_db, mysql, postgres,
1020            testcontainers::{
1021                GenericImage, ImageExt,
1022                core::{IntoContainerPort, WaitFor},
1023                runners::AsyncRunner,
1024            },
1025        };
1026
1027        use super::*;
1028        use crate::StartOptions;
1029
1030        /// Run `spicedb datastore migrate head` for the given engine and URI.
1031        /// Returns Ok(()) on success, Err on failure. Fails the test if spicedb not found.
1032        fn run_migrate(engine: &str, uri: &str, extra_args: &[(&str, &str)]) -> Result<(), String> {
1033            let mut cmd = Command::new("spicedb");
1034            cmd.args([
1035                "datastore",
1036                "migrate",
1037                "head",
1038                "--datastore-engine",
1039                engine,
1040                "--datastore-conn-uri",
1041                uri,
1042            ]);
1043            for (k, v) in extra_args {
1044                cmd.arg(format!("--{k}={v}"));
1045            }
1046            let output = cmd
1047                .output()
1048                .map_err(|e| format!("spicedb migrate failed (is spicedb in PATH?): {e}"))?;
1049            if output.status.success() {
1050                Ok(())
1051            } else {
1052                let stderr = String::from_utf8_lossy(&output.stderr);
1053                Err(format!("spicedb migrate failed: {stderr}"))
1054            }
1055        }
1056
1057        /// Two servers, shared datastore: write via server 1, read via server 2.
1058        fn run_shared_datastore_test(datastore: &str, datastore_uri: &str) {
1059            run_migrate(datastore, datastore_uri, &[]).expect("migration must succeed");
1060
1061            let opts = StartOptions {
1062                datastore: Some(datastore.into()),
1063                datastore_uri: Some(datastore_uri.into()),
1064                ..Default::default()
1065            };
1066
1067            let schema = TEST_SCHEMA;
1068            let db1 = EmbeddedSpiceDB::start_with_schema(schema, &[], Some(&opts)).unwrap();
1069            let db2 = EmbeddedSpiceDB::start_with_schema(schema, &[], Some(&opts)).unwrap();
1070
1071            // Write via server 1
1072            db1.permissions()
1073                .write_relationships(&WriteRelationshipsRequest {
1074                    updates: vec![RelationshipUpdate {
1075                        operation: Operation::Touch as i32,
1076                        relationship: Some(rel("document:shared", "reader", "user:alice")),
1077                    }],
1078                    optional_preconditions: vec![],
1079                    optional_transaction_metadata: None,
1080                })
1081                .unwrap();
1082
1083            // Read via server 2 (shared datastore)
1084            let r = db2
1085                .permissions()
1086                .check_permission(&check_req("document:shared", "read", "user:alice"))
1087                .unwrap();
1088            assert_eq!(r.permissionship, Permissionship::HasPermission as i32);
1089        }
1090
1091        #[tokio::test]
1092        async fn datastore_shared_postgres() {
1093            // PostgreSQL 17+ required for xid8 type (SpiceDB add-xid-columns migration)
1094            let container = postgres::Postgres::default()
1095                .with_tag("17")
1096                .with_platform(LINUX_AMD64)
1097                .start()
1098                .await
1099                .unwrap();
1100            let host = container.get_host().await.unwrap();
1101            let port = container.get_host_port_ipv4(5432).await.unwrap();
1102            let uri = format!("postgres://postgres:postgres@{host}:{port}/postgres");
1103            run_shared_datastore_test("postgres", &uri);
1104        }
1105
1106        #[tokio::test]
1107        async fn datastore_shared_cockroachdb() {
1108            let container = cockroach_db::CockroachDb::default()
1109                .with_platform(LINUX_AMD64)
1110                .start()
1111                .await
1112                .unwrap();
1113            let host = container.get_host().await.unwrap();
1114            let port = container.get_host_port_ipv4(26257).await.unwrap();
1115            let uri = format!("postgres://root@{host}:{port}/defaultdb?sslmode=disable");
1116            run_shared_datastore_test("cockroachdb", &uri);
1117        }
1118
1119        #[tokio::test]
1120        async fn datastore_shared_mysql() {
1121            let container = mysql::Mysql::default()
1122                .with_platform(LINUX_AMD64)
1123                .start()
1124                .await
1125                .unwrap();
1126            let host = container.get_host().await.unwrap();
1127            let port = container.get_host_port_ipv4(3306).await.unwrap();
1128            // MySQL: user@tcp(host:port)/db format; parseTime=true required by SpiceDB
1129            let uri = format!("root@tcp({host}:{port})/test?parseTime=true");
1130            run_shared_datastore_test("mysql", &uri);
1131        }
1132
1133        #[tokio::test]
1134        async fn datastore_shared_spanner() {
1135            // roryq/spanner-emulator: creates instance + database on startup via env vars (no gcloud exec)
1136            // Call GenericImage methods (with_exposed_port, with_wait_for) before ImageExt methods (with_platform, with_env_var)
1137            let container = GenericImage::new("roryq/spanner-emulator", "latest")
1138                .with_exposed_port(9010u16.tcp())
1139                .with_wait_for(WaitFor::seconds(5))
1140                .with_platform(LINUX_AMD64)
1141                .with_env_var("SPANNER_PROJECT_ID", "test-project")
1142                .with_env_var("SPANNER_INSTANCE_ID", "test-instance")
1143                .with_env_var("SPANNER_DATABASE_ID", "test-db")
1144                .start()
1145                .await
1146                .unwrap();
1147            let host = container.get_host().await.unwrap();
1148            let port = container.get_host_port_ipv4(9010u16.tcp()).await.unwrap();
1149            let emulator_host = format!("{host}:{port}");
1150            let uri = "projects/test-project/instances/test-instance/databases/test-db".to_string();
1151
1152            run_migrate(
1153                "spanner",
1154                &uri,
1155                &[("datastore-spanner-emulator-host", &emulator_host)],
1156            )
1157            .expect("migration must succeed");
1158
1159            let opts = StartOptions {
1160                datastore: Some("spanner".into()),
1161                datastore_uri: Some(uri),
1162                spanner_emulator_host: Some(emulator_host),
1163                ..Default::default()
1164            };
1165
1166            let schema = TEST_SCHEMA;
1167            let db1 = EmbeddedSpiceDB::start_with_schema(schema, &[], Some(&opts)).unwrap();
1168            let db2 = EmbeddedSpiceDB::start_with_schema(schema, &[], Some(&opts)).unwrap();
1169
1170            db1.permissions()
1171                .write_relationships(&WriteRelationshipsRequest {
1172                    updates: vec![RelationshipUpdate {
1173                        operation: Operation::Touch as i32,
1174                        relationship: Some(rel("document:shared", "reader", "user:alice")),
1175                    }],
1176                    optional_preconditions: vec![],
1177                    optional_transaction_metadata: None,
1178                })
1179                .unwrap();
1180
1181            let r = db2
1182                .permissions()
1183                .check_permission(&check_req("document:shared", "read", "user:alice"))
1184                .unwrap();
1185            assert_eq!(r.permissionship, Permissionship::HasPermission as i32);
1186        }
1187    }
1188}