1use serde::{Deserialize, Serialize};
8use spicedb_embedded_sys::{dispose, start};
9use spicedb_grpc_tonic::v1::{
10 RelationshipUpdate, WriteRelationshipsRequest, WriteSchemaRequest,
11 relationship_update::Operation,
12};
13
14use crate::SpiceDBError;
15
16#[derive(Debug, Deserialize)]
18struct CResponse {
19 success: bool,
20 error: Option<String>,
21 data: Option<serde_json::Value>,
22}
23
24#[derive(Debug, Default, Clone, Serialize)]
26#[serde(rename_all = "snake_case")]
27pub struct StartOptions {
28 #[serde(skip_serializing_if = "Option::is_none")]
30 pub datastore: Option<String>,
31 #[serde(skip_serializing_if = "Option::is_none")]
33 pub datastore_uri: Option<String>,
34 #[serde(skip_serializing_if = "Option::is_none")]
36 pub spanner_credentials_file: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
39 pub spanner_emulator_host: Option<String>,
40 #[serde(skip_serializing_if = "Option::is_none")]
42 pub mysql_table_prefix: Option<String>,
43 #[serde(skip_serializing_if = "Option::is_none")]
46 pub metrics_enabled: Option<bool>,
47 #[serde(skip_serializing_if = "Option::is_none")]
50 pub datastore_metrics_enabled: Option<bool>,
51 #[serde(skip_serializing_if = "Option::is_none")]
55 pub cache_metrics_enabled: Option<bool>,
56 #[serde(skip_serializing_if = "Option::is_none")]
59 pub otlp_endpoint: Option<String>,
60 #[serde(skip_serializing_if = "Option::is_none")]
63 pub metrics_port: Option<u16>,
64 #[serde(skip_serializing_if = "Option::is_none")]
67 pub metrics_host: Option<String>,
68}
69
70fn parse_json_response(response_str: &str) -> Result<serde_json::Value, SpiceDBError> {
72 let response: CResponse = serde_json::from_str(response_str)
73 .map_err(|e| SpiceDBError::Protocol(format!("invalid JSON: {e} (raw: {response_str})")))?;
74
75 if response.success {
76 Ok(response.data.unwrap_or(serde_json::Value::Null))
77 } else {
78 Err(SpiceDBError::SpiceDB(
79 response.error.unwrap_or_else(|| "unknown error".into()),
80 ))
81 }
82}
83
84use spicedb_embedded_sys::memory_transport;
85use spicedb_grpc_tonic::v1::{
86 CheckBulkPermissionsRequest, CheckBulkPermissionsResponse, CheckPermissionRequest,
87 CheckPermissionResponse, DeleteRelationshipsRequest, DeleteRelationshipsResponse,
88 ExpandPermissionTreeRequest, ExpandPermissionTreeResponse, ReadSchemaRequest,
89 ReadSchemaResponse, WriteRelationshipsResponse, WriteSchemaResponse,
90};
91
92pub struct EmbeddedSpiceDB {
96 handle: u64,
97 streaming_address: String,
99 streaming_transport: String,
101}
102
103unsafe impl Send for EmbeddedSpiceDB {}
104unsafe impl Sync for EmbeddedSpiceDB {}
105
106impl EmbeddedSpiceDB {
107 pub fn start(options: Option<&StartOptions>) -> Result<Self, SpiceDBError> {
116 let opts = options.cloned().unwrap_or_default();
117 let json = serde_json::to_string(&opts)
118 .map_err(|e| SpiceDBError::Protocol(format!("serialize options: {e}")))?;
119 let response_str = start(Some(&json)).map_err(SpiceDBError::Runtime)?;
120 let data = parse_json_response(&response_str)?;
121 let handle = data
122 .get("handle")
123 .and_then(serde_json::Value::as_u64)
124 .ok_or_else(|| SpiceDBError::Protocol("missing handle in start response".into()))?;
125 let streaming_address = data
126 .get("streaming_address")
127 .and_then(serde_json::Value::as_str)
128 .map(String::from)
129 .ok_or_else(|| {
130 SpiceDBError::Protocol("missing streaming_address in start response".into())
131 })?;
132 let streaming_transport = data
133 .get("streaming_transport")
134 .and_then(serde_json::Value::as_str)
135 .map(String::from)
136 .ok_or_else(|| {
137 SpiceDBError::Protocol("missing streaming_transport in start response".into())
138 })?;
139
140 Ok(Self {
141 handle,
142 streaming_address,
143 streaming_transport,
144 })
145 }
146
147 pub fn start_with_schema(
155 schema: &str,
156 relationships: &[spicedb_grpc_tonic::v1::Relationship],
157 options: Option<&StartOptions>,
158 ) -> Result<Self, SpiceDBError> {
159 let db = Self::start(options)?;
160
161 if !schema.is_empty() {
162 memory_transport::write_schema(
163 db.handle,
164 &WriteSchemaRequest {
165 schema: schema.to_string(),
166 },
167 )
168 .map_err(|e| SpiceDBError::SpiceDB(e.0))?;
169 }
170
171 if !relationships.is_empty() {
172 let updates: Vec<RelationshipUpdate> = relationships
173 .iter()
174 .map(|r| RelationshipUpdate {
175 operation: Operation::Touch as i32,
176 relationship: Some(r.clone()),
177 })
178 .collect();
179 memory_transport::write_relationships(
180 db.handle,
181 &WriteRelationshipsRequest {
182 updates,
183 optional_preconditions: vec![],
184 optional_transaction_metadata: None,
185 },
186 )
187 .map_err(|e| SpiceDBError::SpiceDB(e.0))?;
188 }
189
190 Ok(db)
191 }
192
193 #[deprecated(since = "0.6.0", note = "renamed to start_with_schema")]
199 pub fn new(
200 schema: &str,
201 relationships: &[spicedb_grpc_tonic::v1::Relationship],
202 options: Option<&StartOptions>,
203 ) -> Result<Self, SpiceDBError> {
204 Self::start_with_schema(schema, relationships, options)
205 }
206
207 #[must_use]
209 pub const fn permissions(&self) -> MemoryPermissionsClient {
210 MemoryPermissionsClient {
211 handle: self.handle,
212 }
213 }
214
215 #[must_use]
217 pub const fn schema(&self) -> MemorySchemaClient {
218 MemorySchemaClient {
219 handle: self.handle,
220 }
221 }
222
223 #[must_use]
225 pub const fn handle(&self) -> u64 {
226 self.handle
227 }
228
229 #[must_use]
232 pub fn streaming_address(&self) -> &str {
233 &self.streaming_address
234 }
235
236 #[must_use]
238 pub fn streaming_transport(&self) -> &str {
239 &self.streaming_transport
240 }
241}
242
243impl Drop for EmbeddedSpiceDB {
244 fn drop(&mut self) {
245 let _ = dispose(self.handle);
246 }
247}
248
249pub struct MemoryPermissionsClient {
251 handle: u64,
252}
253
254impl MemoryPermissionsClient {
255 pub fn check_permission(
261 &self,
262 request: &CheckPermissionRequest,
263 ) -> Result<CheckPermissionResponse, SpiceDBError> {
264 memory_transport::check_permission(self.handle, request)
265 .map_err(|e| SpiceDBError::SpiceDB(e.0))
266 }
267
268 pub fn write_relationships(
274 &self,
275 request: &WriteRelationshipsRequest,
276 ) -> Result<WriteRelationshipsResponse, SpiceDBError> {
277 memory_transport::write_relationships(self.handle, request)
278 .map_err(|e| SpiceDBError::SpiceDB(e.0))
279 }
280
281 pub fn delete_relationships(
287 &self,
288 request: &DeleteRelationshipsRequest,
289 ) -> Result<DeleteRelationshipsResponse, SpiceDBError> {
290 memory_transport::delete_relationships(self.handle, request)
291 .map_err(|e| SpiceDBError::SpiceDB(e.0))
292 }
293
294 pub fn check_bulk_permissions(
300 &self,
301 request: &CheckBulkPermissionsRequest,
302 ) -> Result<CheckBulkPermissionsResponse, SpiceDBError> {
303 memory_transport::check_bulk_permissions(self.handle, request)
304 .map_err(|e| SpiceDBError::SpiceDB(e.0))
305 }
306
307 pub fn expand_permission_tree(
313 &self,
314 request: &ExpandPermissionTreeRequest,
315 ) -> Result<ExpandPermissionTreeResponse, SpiceDBError> {
316 memory_transport::expand_permission_tree(self.handle, request)
317 .map_err(|e| SpiceDBError::SpiceDB(e.0))
318 }
319}
320
321pub struct MemorySchemaClient {
323 handle: u64,
324}
325
326impl MemorySchemaClient {
327 pub fn read_schema(
333 &self,
334 request: &ReadSchemaRequest,
335 ) -> Result<ReadSchemaResponse, SpiceDBError> {
336 memory_transport::read_schema(self.handle, request).map_err(|e| SpiceDBError::SpiceDB(e.0))
337 }
338
339 pub fn write_schema(
345 &self,
346 request: &WriteSchemaRequest,
347 ) -> Result<WriteSchemaResponse, SpiceDBError> {
348 memory_transport::write_schema(self.handle, request).map_err(|e| SpiceDBError::SpiceDB(e.0))
349 }
350}
351
352#[cfg(test)]
353mod tests {
354
355 use std::time::Duration;
356
357 use spicedb_grpc_tonic::v1::{
358 CheckPermissionRequest, Consistency, ObjectReference, ReadRelationshipsRequest,
359 Relationship, RelationshipFilter, RelationshipUpdate, SubjectReference, WatchKind,
360 WatchRequest, WriteRelationshipsRequest, relationship_update::Operation,
361 watch_service_client::WatchServiceClient,
362 };
363 use tokio::time::timeout;
364 use tokio_stream::StreamExt;
365 use tonic::transport::{Channel, Endpoint};
366
367 use super::*;
368 use crate::v1::check_permission_response::Permissionship;
369
370 #[allow(unused_variables)] async fn connect_streaming(
373 addr: &str,
374 transport: &str,
375 ) -> Result<Channel, Box<dyn std::error::Error + Send + Sync>> {
376 #[cfg(unix)]
377 {
378 if transport == "unix" {
379 let path = addr.to_string();
380 Endpoint::try_from("http://[::]:50051")?
381 .connect_with_connector(tower::service_fn(move |_: tonic::transport::Uri| {
382 let path = path.clone();
383 async move {
384 let stream = tokio::net::UnixStream::connect(&path).await?;
385 Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream))
386 }
387 }))
388 .await
389 .map_err(Into::into)
390 } else {
391 Endpoint::from_shared(format!("http://{addr}"))?
392 .connect()
393 .await
394 .map_err(Into::into)
395 }
396 }
397 #[cfg(windows)]
398 {
399 Endpoint::from_shared(format!("http://{addr}"))?
400 .connect()
401 .await
402 .map_err(Into::into)
403 }
404 }
405
406 const TEST_SCHEMA: &str = r"
407definition user {}
408
409definition document {
410 relation reader: user
411 relation writer: user
412
413 permission read = reader + writer
414 permission write = writer
415}
416";
417
418 fn rel(resource: &str, relation: &str, subject: &str) -> Relationship {
419 let (res_type, res_id) = resource.split_once(':').unwrap();
420 let (sub_type, sub_id) = subject.split_once(':').unwrap();
421 Relationship {
422 resource: Some(ObjectReference {
423 object_type: res_type.into(),
424 object_id: res_id.into(),
425 }),
426 relation: relation.into(),
427 subject: Some(SubjectReference {
428 object: Some(ObjectReference {
429 object_type: sub_type.into(),
430 object_id: sub_id.into(),
431 }),
432 optional_relation: String::new(),
433 }),
434 optional_caveat: None,
435 optional_expires_at: None,
436 }
437 }
438
439 fn fully_consistent() -> Consistency {
440 Consistency {
441 requirement: Some(crate::v1::consistency::Requirement::FullyConsistent(true)),
442 }
443 }
444
445 fn check_req(resource: &str, permission: &str, subject: &str) -> CheckPermissionRequest {
446 let (res_type, res_id) = resource.split_once(':').unwrap();
447 let (sub_type, sub_id) = subject.split_once(':').unwrap();
448 CheckPermissionRequest {
449 consistency: Some(fully_consistent()),
450 resource: Some(ObjectReference {
451 object_type: res_type.into(),
452 object_id: res_id.into(),
453 }),
454 permission: permission.into(),
455 subject: Some(SubjectReference {
456 object: Some(ObjectReference {
457 object_type: sub_type.into(),
458 object_id: sub_id.into(),
459 }),
460 optional_relation: String::new(),
461 }),
462 context: None,
463 with_tracing: false,
464 }
465 }
466
467 #[test]
469 fn test_check_permission() {
470 let relationships = vec![
471 rel("document:readme", "reader", "user:alice"),
472 rel("document:readme", "writer", "user:bob"),
473 ];
474 let spicedb = match EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &relationships, None) {
475 Ok(db) => db,
476 Err(e) => {
477 let msg = e.to_string();
478 if msg.contains("streaming proxy")
479 || msg.contains("bind")
480 || msg.contains("operation not permitted")
481 {
482 return;
483 }
484 panic!("EmbeddedSpiceDB::start_with_schema failed: {e}");
485 }
486 };
487
488 let response = spicedb
489 .permissions()
490 .check_permission(&check_req("document:readme", "read", "user:alice"))
491 .unwrap();
492 assert_eq!(
493 response.permissionship,
494 Permissionship::HasPermission as i32,
495 "alice should have read on document:readme"
496 );
497 }
498
499 #[test]
501 fn test_watch_streaming() {
502 let db = match EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None) {
503 Ok(d) => d,
504 Err(e) => {
505 let msg = e.to_string();
506 if msg.contains("streaming proxy")
507 || msg.contains("bind")
508 || msg.contains("operation not permitted")
509 {
510 return;
511 }
512 panic!("EmbeddedSpiceDB::start_with_schema failed: {e}");
513 }
514 };
515
516 let rt = tokio::runtime::Runtime::new().unwrap();
517 rt.block_on(async {
518 let channel = connect_streaming(db.streaming_address(), db.streaming_transport())
519 .await
520 .unwrap();
521 let mut watch_client = WatchServiceClient::new(channel);
522 let watch_req = WatchRequest {
524 optional_update_kinds: vec![
525 WatchKind::IncludeRelationshipUpdates.into(),
526 WatchKind::IncludeCheckpoints.into(),
527 ],
528 ..Default::default()
529 };
530 let mut stream =
531 match timeout(Duration::from_secs(10), watch_client.watch(watch_req)).await {
532 Ok(Ok(response)) => response.into_inner(),
533 Ok(Err(e)) => panic!("watch() failed: {e}"),
534 Err(_) => return,
535 };
536
537 let write_req = WriteRelationshipsRequest {
538 updates: vec![RelationshipUpdate {
539 operation: Operation::Touch as i32,
540 relationship: Some(rel("document:watched", "reader", "user:alice")),
541 }],
542 optional_preconditions: vec![],
543 optional_transaction_metadata: None,
544 };
545 db.permissions().write_relationships(&write_req).unwrap();
546
547 let mut received_update = false;
548 let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
549 while tokio::time::Instant::now() < deadline {
550 match timeout(Duration::from_millis(200), stream.next()).await {
551 Ok(Some(Ok(resp))) => {
552 if !resp.updates.is_empty() {
553 received_update = true;
554 break;
555 }
556 }
557 Ok(Some(Err(e))) => panic!("watch stream error: {e}"),
558 Ok(None) => break,
559 Err(_) => {}
560 }
561 }
562 assert!(
563 received_update,
564 "expected at least one Watch response with updates within 3s"
565 );
566 });
567 }
568
569 #[test]
570 fn test_ffi_spicedb() {
571 let relationships = vec![
572 rel("document:readme", "reader", "user:alice"),
573 rel("document:readme", "writer", "user:bob"),
574 ];
575
576 let spicedb =
577 EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &relationships, None).unwrap();
578
579 assert_eq!(
580 spicedb
581 .permissions()
582 .check_permission(&check_req("document:readme", "read", "user:alice"))
583 .unwrap()
584 .permissionship,
585 Permissionship::HasPermission as i32,
586 );
587 assert_eq!(
588 spicedb
589 .permissions()
590 .check_permission(&check_req("document:readme", "write", "user:alice"))
591 .unwrap()
592 .permissionship,
593 Permissionship::NoPermission as i32,
594 );
595 assert_eq!(
596 spicedb
597 .permissions()
598 .check_permission(&check_req("document:readme", "read", "user:bob"))
599 .unwrap()
600 .permissionship,
601 Permissionship::HasPermission as i32,
602 );
603 assert_eq!(
604 spicedb
605 .permissions()
606 .check_permission(&check_req("document:readme", "write", "user:bob"))
607 .unwrap()
608 .permissionship,
609 Permissionship::HasPermission as i32,
610 );
611 assert_eq!(
612 spicedb
613 .permissions()
614 .check_permission(&check_req("document:readme", "read", "user:charlie"))
615 .unwrap()
616 .permissionship,
617 Permissionship::NoPermission as i32,
618 );
619 }
620
621 #[test]
622 fn test_add_relationship() {
623 let spicedb = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
624
625 spicedb
626 .permissions()
627 .write_relationships(&WriteRelationshipsRequest {
628 updates: vec![RelationshipUpdate {
629 operation: Operation::Touch as i32,
630 relationship: Some(rel("document:test", "reader", "user:alice")),
631 }],
632 optional_preconditions: vec![],
633 optional_transaction_metadata: None,
634 })
635 .unwrap();
636
637 let r = spicedb
638 .permissions()
639 .check_permission(&check_req("document:test", "read", "user:alice"))
640 .unwrap();
641 assert_eq!(r.permissionship, Permissionship::HasPermission as i32);
642 }
643
644 #[test]
645 fn test_parallel_instances() {
646 let spicedb1 = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
647 let spicedb2 = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
648
649 spicedb1
650 .permissions()
651 .write_relationships(&WriteRelationshipsRequest {
652 updates: vec![RelationshipUpdate {
653 operation: Operation::Touch as i32,
654 relationship: Some(rel("document:doc1", "reader", "user:alice")),
655 }],
656 optional_preconditions: vec![],
657 optional_transaction_metadata: None,
658 })
659 .unwrap();
660
661 let r1 = spicedb1
662 .permissions()
663 .check_permission(&check_req("document:doc1", "read", "user:alice"))
664 .unwrap();
665 assert_eq!(r1.permissionship, Permissionship::HasPermission as i32);
666
667 let r2 = spicedb2
668 .permissions()
669 .check_permission(&check_req("document:doc1", "read", "user:alice"))
670 .unwrap();
671 assert_eq!(r2.permissionship, Permissionship::NoPermission as i32);
672 }
673
674 #[tokio::test]
675 async fn test_read_relationships() {
676 let relationships = vec![
677 rel("document:doc1", "reader", "user:alice"),
678 rel("document:doc1", "reader", "user:bob"),
679 ];
680
681 let spicedb =
682 EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &relationships, None).unwrap();
683 let channel = connect_streaming(spicedb.streaming_address(), spicedb.streaming_transport())
684 .await
685 .unwrap();
686 let mut client =
687 spicedb_grpc_tonic::v1::permissions_service_client::PermissionsServiceClient::new(
688 channel,
689 );
690 let mut stream = client
691 .read_relationships(ReadRelationshipsRequest {
692 consistency: Some(fully_consistent()),
693 relationship_filter: Some(RelationshipFilter {
694 resource_type: "document".into(),
695 optional_resource_id: "doc1".into(),
696 optional_resource_id_prefix: String::new(),
697 optional_relation: String::new(),
698 optional_subject_filter: None,
699 }),
700 optional_limit: 0,
701 optional_cursor: None,
702 })
703 .await
704 .unwrap()
705 .into_inner();
706
707 let mut count = 0;
708 while let Some(Ok(_)) = stream.next().await {
709 count += 1;
710 }
711 assert_eq!(count, 2);
712 }
713
714 #[test]
716 #[ignore = "performance test - run manually with --ignored flag"]
717 fn perf_check_with_1000_relationships() {
718 const NUM_CHECKS: usize = 100;
719 use std::time::Instant;
720
721 let relationships: Vec<Relationship> = (0..1000)
723 .map(|i| {
724 rel(
725 &format!("document:doc{i}"),
726 "reader",
727 &format!("user:user{}", i % 100),
728 )
729 })
730 .collect();
731
732 println!("\n=== Performance: Check with 1000 relationships ===");
733
734 let start = Instant::now();
735 let spicedb =
736 EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &relationships, None).unwrap();
737 println!(
738 "Instance creation with 1000 relationships: {:?}",
739 start.elapsed()
740 );
741
742 for _ in 0..10 {
744 let _ = spicedb.permissions().check_permission(&check_req(
745 "document:doc0",
746 "read",
747 "user:user0",
748 ));
749 }
750
751 let start = Instant::now();
753 for i in 0..NUM_CHECKS {
754 let doc = format!("document:doc{}", i % 1000);
755 let user = format!("user:user{}", i % 100);
756 let _ = spicedb
757 .permissions()
758 .check_permission(&check_req(&doc, "read", &user))
759 .unwrap();
760 }
761 let elapsed = start.elapsed();
762
763 let num_checks_u32 = u32::try_from(NUM_CHECKS).unwrap();
764 println!("Total time for {NUM_CHECKS} checks: {elapsed:?}");
765 println!("Average per check: {:?}", elapsed / num_checks_u32);
766 println!(
767 "Checks per second: {:.0}",
768 f64::from(num_checks_u32) / elapsed.as_secs_f64()
769 );
770 }
771
772 #[test]
774 #[ignore = "performance test - run manually with --ignored flag"]
775 fn perf_add_individual_relationships() {
776 const NUM_ADDS: usize = 50;
777 use std::time::Instant;
778
779 let spicedb = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
780
781 println!("\n=== Performance: Add individual relationships ===");
782
783 let start = Instant::now();
784 for i in 0..NUM_ADDS {
785 spicedb
786 .permissions()
787 .write_relationships(&WriteRelationshipsRequest {
788 updates: vec![RelationshipUpdate {
789 operation: Operation::Touch as i32,
790 relationship: Some(rel(
791 &format!("document:perf{i}"),
792 "reader",
793 "user:alice",
794 )),
795 }],
796 optional_preconditions: vec![],
797 optional_transaction_metadata: None,
798 })
799 .unwrap();
800 }
801 let elapsed = start.elapsed();
802
803 let num_adds_u32 = u32::try_from(NUM_ADDS).unwrap();
804 println!("Total time for {NUM_ADDS} individual adds: {elapsed:?}");
805 println!("Average per add: {:?}", elapsed / num_adds_u32);
806 println!(
807 "Adds per second: {:.0}",
808 f64::from(num_adds_u32) / elapsed.as_secs_f64()
809 );
810 }
811
812 #[test]
814 #[ignore = "performance test - run manually with --ignored flag"]
815 fn perf_bulk_write_relationships() {
816 use std::time::Instant;
817
818 let spicedb = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
819
820 println!("\n=== Performance: Bulk write relationships ===");
821
822 for batch_size in [5_i32, 10, 20, 50] {
824 let batch_size_u32 = u32::try_from(batch_size).unwrap();
825 let relationships: Vec<Relationship> = (0..batch_size)
826 .map(|i| {
827 rel(
828 &format!("document:bulk{batch_size}_{i}"),
829 "reader",
830 "user:alice",
831 )
832 })
833 .collect();
834
835 let start = Instant::now();
836 spicedb
837 .permissions()
838 .write_relationships(&WriteRelationshipsRequest {
839 updates: relationships
840 .iter()
841 .map(|r| RelationshipUpdate {
842 operation: Operation::Touch as i32,
843 relationship: Some(r.clone()),
844 })
845 .collect(),
846 optional_preconditions: vec![],
847 optional_transaction_metadata: None,
848 })
849 .unwrap();
850 let elapsed = start.elapsed();
851
852 println!(
853 "Batch of {} relationships: {:?} ({:?} per relationship)",
854 batch_size,
855 elapsed,
856 elapsed / batch_size_u32
857 );
858 }
859
860 println!("\n--- Comparison: 10 individual vs 10 bulk ---");
862
863 let spicedb2 = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
864
865 let start = Instant::now();
866 for i in 0..10 {
867 spicedb2
868 .permissions()
869 .write_relationships(&WriteRelationshipsRequest {
870 updates: vec![RelationshipUpdate {
871 operation: Operation::Touch as i32,
872 relationship: Some(rel(
873 &format!("document:cmp_ind{i}"),
874 "reader",
875 "user:bob",
876 )),
877 }],
878 optional_preconditions: vec![],
879 optional_transaction_metadata: None,
880 })
881 .unwrap();
882 }
883 let individual_time = start.elapsed();
884 println!("10 individual adds: {individual_time:?}");
885
886 let relationships: Vec<Relationship> = (0..10)
887 .map(|i| rel(&format!("document:cmp_bulk{i}"), "reader", "user:bob"))
888 .collect();
889 let start = Instant::now();
890 spicedb2
891 .permissions()
892 .write_relationships(&WriteRelationshipsRequest {
893 updates: relationships
894 .iter()
895 .map(|r| RelationshipUpdate {
896 operation: Operation::Touch as i32,
897 relationship: Some(r.clone()),
898 })
899 .collect(),
900 optional_preconditions: vec![],
901 optional_transaction_metadata: None,
902 })
903 .unwrap();
904 let bulk_time = start.elapsed();
905 println!("10 bulk add: {bulk_time:?}");
906 println!(
907 "Speedup: {:.1}x",
908 individual_time.as_secs_f64() / bulk_time.as_secs_f64()
909 );
910 }
911
912 #[test]
914 #[ignore = "performance test - run manually with --ignored flag"]
915 fn perf_embedded_50k_relationships() {
916 const TOTAL_RELS: usize = 50_000;
917 const BATCH_SIZE: usize = 1000;
918 const NUM_CHECKS: usize = 500;
919 use std::time::Instant;
920
921 println!("\n=== Embedded SpiceDB: 50,000 relationships ===");
922
923 println!("Creating instance...");
925 let start = Instant::now();
926 let spicedb = EmbeddedSpiceDB::start_with_schema(TEST_SCHEMA, &[], None).unwrap();
927 println!("Instance creation time: {:?}", start.elapsed());
928
929 println!("Adding {TOTAL_RELS} relationships in batches of {BATCH_SIZE}...");
930 let start = Instant::now();
931 for batch_num in 0..(TOTAL_RELS / BATCH_SIZE) {
932 let batch_start = batch_num * BATCH_SIZE;
933 let relationships: Vec<Relationship> = (batch_start..batch_start + BATCH_SIZE)
934 .map(|i| {
935 rel(
936 &format!("document:doc{i}"),
937 "reader",
938 &format!("user:user{}", i % 1000),
939 )
940 })
941 .collect();
942 spicedb
943 .permissions()
944 .write_relationships(&WriteRelationshipsRequest {
945 updates: relationships
946 .iter()
947 .map(|r| RelationshipUpdate {
948 operation: Operation::Touch as i32,
949 relationship: Some(r.clone()),
950 })
951 .collect(),
952 optional_preconditions: vec![],
953 optional_transaction_metadata: None,
954 })
955 .unwrap();
956 }
957 println!(
958 "Total time to add {} relationships: {:?}",
959 TOTAL_RELS,
960 start.elapsed()
961 );
962
963 for _ in 0..20 {
965 let _ = spicedb.permissions().check_permission(&check_req(
966 "document:doc0",
967 "read",
968 "user:user0",
969 ));
970 }
971
972 let num_checks_u32 = u32::try_from(NUM_CHECKS).unwrap();
974 let start = Instant::now();
975 for i in 0..NUM_CHECKS {
976 let doc = format!("document:doc{}", i % TOTAL_RELS);
977 let user = format!("user:user{}", i % 1000);
978 let _ = spicedb
979 .permissions()
980 .check_permission(&check_req(&doc, "read", &user))
981 .unwrap();
982 }
983 let elapsed = start.elapsed();
984
985 println!("Total time for {NUM_CHECKS} checks: {elapsed:?}");
986 println!("Average per check: {:?}", elapsed / num_checks_u32);
987 println!(
988 "Checks per second: {:.0}",
989 f64::from(num_checks_u32) / elapsed.as_secs_f64()
990 );
991
992 let start = Instant::now();
994 for i in 0..NUM_CHECKS {
995 let doc = format!("document:doc{}", i % TOTAL_RELS);
996 let _ = spicedb
998 .permissions()
999 .check_permission(&check_req(&doc, "read", "user:nonexistent"))
1000 .unwrap();
1001 }
1002 let elapsed = start.elapsed();
1003 println!("\nNegative checks (user not found):");
1004 println!("Average per check: {:?}", elapsed / num_checks_u32);
1005 }
1006
1007 #[cfg(all(not(target_os = "windows"), target_arch = "x86_64"))]
1013 mod datastore_shared {
1014 const LINUX_AMD64: &str = "linux/amd64";
1016 use std::process::Command;
1017
1018 use testcontainers_modules::{
1019 cockroach_db, mysql, postgres,
1020 testcontainers::{
1021 GenericImage, ImageExt,
1022 core::{IntoContainerPort, WaitFor},
1023 runners::AsyncRunner,
1024 },
1025 };
1026
1027 use super::*;
1028 use crate::StartOptions;
1029
1030 fn run_migrate(engine: &str, uri: &str, extra_args: &[(&str, &str)]) -> Result<(), String> {
1033 let mut cmd = Command::new("spicedb");
1034 cmd.args([
1035 "datastore",
1036 "migrate",
1037 "head",
1038 "--datastore-engine",
1039 engine,
1040 "--datastore-conn-uri",
1041 uri,
1042 ]);
1043 for (k, v) in extra_args {
1044 cmd.arg(format!("--{k}={v}"));
1045 }
1046 let output = cmd
1047 .output()
1048 .map_err(|e| format!("spicedb migrate failed (is spicedb in PATH?): {e}"))?;
1049 if output.status.success() {
1050 Ok(())
1051 } else {
1052 let stderr = String::from_utf8_lossy(&output.stderr);
1053 Err(format!("spicedb migrate failed: {stderr}"))
1054 }
1055 }
1056
1057 fn run_shared_datastore_test(datastore: &str, datastore_uri: &str) {
1059 run_migrate(datastore, datastore_uri, &[]).expect("migration must succeed");
1060
1061 let opts = StartOptions {
1062 datastore: Some(datastore.into()),
1063 datastore_uri: Some(datastore_uri.into()),
1064 ..Default::default()
1065 };
1066
1067 let schema = TEST_SCHEMA;
1068 let db1 = EmbeddedSpiceDB::start_with_schema(schema, &[], Some(&opts)).unwrap();
1069 let db2 = EmbeddedSpiceDB::start_with_schema(schema, &[], Some(&opts)).unwrap();
1070
1071 db1.permissions()
1073 .write_relationships(&WriteRelationshipsRequest {
1074 updates: vec![RelationshipUpdate {
1075 operation: Operation::Touch as i32,
1076 relationship: Some(rel("document:shared", "reader", "user:alice")),
1077 }],
1078 optional_preconditions: vec![],
1079 optional_transaction_metadata: None,
1080 })
1081 .unwrap();
1082
1083 let r = db2
1085 .permissions()
1086 .check_permission(&check_req("document:shared", "read", "user:alice"))
1087 .unwrap();
1088 assert_eq!(r.permissionship, Permissionship::HasPermission as i32);
1089 }
1090
1091 #[tokio::test]
1092 async fn datastore_shared_postgres() {
1093 let container = postgres::Postgres::default()
1095 .with_tag("17")
1096 .with_platform(LINUX_AMD64)
1097 .start()
1098 .await
1099 .unwrap();
1100 let host = container.get_host().await.unwrap();
1101 let port = container.get_host_port_ipv4(5432).await.unwrap();
1102 let uri = format!("postgres://postgres:postgres@{host}:{port}/postgres");
1103 run_shared_datastore_test("postgres", &uri);
1104 }
1105
1106 #[tokio::test]
1107 async fn datastore_shared_cockroachdb() {
1108 let container = cockroach_db::CockroachDb::default()
1109 .with_platform(LINUX_AMD64)
1110 .start()
1111 .await
1112 .unwrap();
1113 let host = container.get_host().await.unwrap();
1114 let port = container.get_host_port_ipv4(26257).await.unwrap();
1115 let uri = format!("postgres://root@{host}:{port}/defaultdb?sslmode=disable");
1116 run_shared_datastore_test("cockroachdb", &uri);
1117 }
1118
1119 #[tokio::test]
1120 async fn datastore_shared_mysql() {
1121 let container = mysql::Mysql::default()
1122 .with_platform(LINUX_AMD64)
1123 .start()
1124 .await
1125 .unwrap();
1126 let host = container.get_host().await.unwrap();
1127 let port = container.get_host_port_ipv4(3306).await.unwrap();
1128 let uri = format!("root@tcp({host}:{port})/test?parseTime=true");
1130 run_shared_datastore_test("mysql", &uri);
1131 }
1132
1133 #[tokio::test]
1134 async fn datastore_shared_spanner() {
1135 let container = GenericImage::new("roryq/spanner-emulator", "latest")
1138 .with_exposed_port(9010u16.tcp())
1139 .with_wait_for(WaitFor::seconds(5))
1140 .with_platform(LINUX_AMD64)
1141 .with_env_var("SPANNER_PROJECT_ID", "test-project")
1142 .with_env_var("SPANNER_INSTANCE_ID", "test-instance")
1143 .with_env_var("SPANNER_DATABASE_ID", "test-db")
1144 .start()
1145 .await
1146 .unwrap();
1147 let host = container.get_host().await.unwrap();
1148 let port = container.get_host_port_ipv4(9010u16.tcp()).await.unwrap();
1149 let emulator_host = format!("{host}:{port}");
1150 let uri = "projects/test-project/instances/test-instance/databases/test-db".to_string();
1151
1152 run_migrate(
1153 "spanner",
1154 &uri,
1155 &[("datastore-spanner-emulator-host", &emulator_host)],
1156 )
1157 .expect("migration must succeed");
1158
1159 let opts = StartOptions {
1160 datastore: Some("spanner".into()),
1161 datastore_uri: Some(uri),
1162 spanner_emulator_host: Some(emulator_host),
1163 ..Default::default()
1164 };
1165
1166 let schema = TEST_SCHEMA;
1167 let db1 = EmbeddedSpiceDB::start_with_schema(schema, &[], Some(&opts)).unwrap();
1168 let db2 = EmbeddedSpiceDB::start_with_schema(schema, &[], Some(&opts)).unwrap();
1169
1170 db1.permissions()
1171 .write_relationships(&WriteRelationshipsRequest {
1172 updates: vec![RelationshipUpdate {
1173 operation: Operation::Touch as i32,
1174 relationship: Some(rel("document:shared", "reader", "user:alice")),
1175 }],
1176 optional_preconditions: vec![],
1177 optional_transaction_metadata: None,
1178 })
1179 .unwrap();
1180
1181 let r = db2
1182 .permissions()
1183 .check_permission(&check_req("document:shared", "read", "user:alice"))
1184 .unwrap();
1185 assert_eq!(r.permissionship, Permissionship::HasPermission as i32);
1186 }
1187 }
1188}