1use serde::{Deserialize, Serialize};
8use spicedb_embedded_sys::{dispose, start};
9use spicedb_grpc_tonic::v1::{
10 RelationshipUpdate, WriteRelationshipsRequest, WriteSchemaRequest,
11 relationship_update::Operation,
12};
13
14use crate::SpiceDBError;
15
16#[derive(Debug, Deserialize)]
18struct CResponse {
19 success: bool,
20 error: Option<String>,
21 data: Option<serde_json::Value>,
22}
23
24#[derive(Debug, Default, Clone, Serialize)]
26#[serde(rename_all = "snake_case")]
27pub struct StartOptions {
28 #[serde(skip_serializing_if = "Option::is_none")]
30 pub datastore: Option<String>,
31 #[serde(skip_serializing_if = "Option::is_none")]
33 pub datastore_uri: Option<String>,
34 #[serde(skip_serializing_if = "Option::is_none")]
36 pub spanner_credentials_file: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
39 pub spanner_emulator_host: Option<String>,
40 #[serde(skip_serializing_if = "Option::is_none")]
42 pub mysql_table_prefix: Option<String>,
43 #[serde(skip_serializing_if = "Option::is_none")]
45 pub metrics_enabled: Option<bool>,
46}
47
48fn parse_json_response(response_str: &str) -> Result<serde_json::Value, SpiceDBError> {
50 let response: CResponse = serde_json::from_str(response_str)
51 .map_err(|e| SpiceDBError::Protocol(format!("invalid JSON: {e} (raw: {response_str})")))?;
52
53 if response.success {
54 Ok(response.data.unwrap_or(serde_json::Value::Null))
55 } else {
56 Err(SpiceDBError::SpiceDB(
57 response.error.unwrap_or_else(|| "unknown error".into()),
58 ))
59 }
60}
61
62use spicedb_embedded_sys::memory_transport;
63use spicedb_grpc_tonic::v1::{
64 CheckBulkPermissionsRequest, CheckBulkPermissionsResponse, CheckPermissionRequest,
65 CheckPermissionResponse, DeleteRelationshipsRequest, DeleteRelationshipsResponse,
66 ExpandPermissionTreeRequest, ExpandPermissionTreeResponse, ReadSchemaRequest,
67 ReadSchemaResponse, WriteRelationshipsResponse, WriteSchemaResponse,
68};
69
70pub struct EmbeddedSpiceDB {
74 handle: u64,
75 streaming_address: String,
77 streaming_transport: String,
79}
80
81unsafe impl Send for EmbeddedSpiceDB {}
82unsafe impl Sync for EmbeddedSpiceDB {}
83
84impl EmbeddedSpiceDB {
85 pub fn new(
93 schema: &str,
94 relationships: &[spicedb_grpc_tonic::v1::Relationship],
95 options: Option<&StartOptions>,
96 ) -> Result<Self, SpiceDBError> {
97 let opts = options.cloned().unwrap_or_default();
98 let json = serde_json::to_string(&opts)
99 .map_err(|e| SpiceDBError::Protocol(format!("serialize options: {e}")))?;
100 let response_str = start(Some(&json)).map_err(SpiceDBError::Runtime)?;
101 let data = parse_json_response(&response_str)?;
102 let handle = data
103 .get("handle")
104 .and_then(serde_json::Value::as_u64)
105 .ok_or_else(|| SpiceDBError::Protocol("missing handle in start response".into()))?;
106 let streaming_address = data
107 .get("streaming_address")
108 .and_then(serde_json::Value::as_str)
109 .map(String::from)
110 .ok_or_else(|| {
111 SpiceDBError::Protocol("missing streaming_address in start response".into())
112 })?;
113 let streaming_transport = data
114 .get("streaming_transport")
115 .and_then(serde_json::Value::as_str)
116 .map(String::from)
117 .ok_or_else(|| {
118 SpiceDBError::Protocol("missing streaming_transport in start response".into())
119 })?;
120
121 let db = Self {
122 handle,
123 streaming_address,
124 streaming_transport,
125 };
126
127 if !schema.is_empty() {
128 memory_transport::write_schema(
129 db.handle,
130 &WriteSchemaRequest {
131 schema: schema.to_string(),
132 },
133 )
134 .map_err(|e| SpiceDBError::SpiceDB(e.0))?;
135 }
136
137 if !relationships.is_empty() {
138 let updates: Vec<RelationshipUpdate> = relationships
139 .iter()
140 .map(|r| RelationshipUpdate {
141 operation: Operation::Touch as i32,
142 relationship: Some(r.clone()),
143 })
144 .collect();
145 memory_transport::write_relationships(
146 db.handle,
147 &WriteRelationshipsRequest {
148 updates,
149 optional_preconditions: vec![],
150 optional_transaction_metadata: None,
151 },
152 )
153 .map_err(|e| SpiceDBError::SpiceDB(e.0))?;
154 }
155
156 Ok(db)
157 }
158
159 #[must_use]
161 pub const fn permissions(&self) -> MemoryPermissionsClient {
162 MemoryPermissionsClient {
163 handle: self.handle,
164 }
165 }
166
167 #[must_use]
169 pub const fn schema(&self) -> MemorySchemaClient {
170 MemorySchemaClient {
171 handle: self.handle,
172 }
173 }
174
175 #[must_use]
177 pub const fn handle(&self) -> u64 {
178 self.handle
179 }
180
181 #[must_use]
184 pub fn streaming_address(&self) -> &str {
185 &self.streaming_address
186 }
187
188 #[must_use]
190 pub fn streaming_transport(&self) -> &str {
191 &self.streaming_transport
192 }
193}
194
195impl Drop for EmbeddedSpiceDB {
196 fn drop(&mut self) {
197 let _ = dispose(self.handle);
198 }
199}
200
201pub struct MemoryPermissionsClient {
203 handle: u64,
204}
205
206impl MemoryPermissionsClient {
207 pub fn check_permission(
213 &self,
214 request: &CheckPermissionRequest,
215 ) -> Result<CheckPermissionResponse, SpiceDBError> {
216 memory_transport::check_permission(self.handle, request)
217 .map_err(|e| SpiceDBError::SpiceDB(e.0))
218 }
219
220 pub fn write_relationships(
226 &self,
227 request: &WriteRelationshipsRequest,
228 ) -> Result<WriteRelationshipsResponse, SpiceDBError> {
229 memory_transport::write_relationships(self.handle, request)
230 .map_err(|e| SpiceDBError::SpiceDB(e.0))
231 }
232
233 pub fn delete_relationships(
239 &self,
240 request: &DeleteRelationshipsRequest,
241 ) -> Result<DeleteRelationshipsResponse, SpiceDBError> {
242 memory_transport::delete_relationships(self.handle, request)
243 .map_err(|e| SpiceDBError::SpiceDB(e.0))
244 }
245
246 pub fn check_bulk_permissions(
252 &self,
253 request: &CheckBulkPermissionsRequest,
254 ) -> Result<CheckBulkPermissionsResponse, SpiceDBError> {
255 memory_transport::check_bulk_permissions(self.handle, request)
256 .map_err(|e| SpiceDBError::SpiceDB(e.0))
257 }
258
259 pub fn expand_permission_tree(
265 &self,
266 request: &ExpandPermissionTreeRequest,
267 ) -> Result<ExpandPermissionTreeResponse, SpiceDBError> {
268 memory_transport::expand_permission_tree(self.handle, request)
269 .map_err(|e| SpiceDBError::SpiceDB(e.0))
270 }
271}
272
273pub struct MemorySchemaClient {
275 handle: u64,
276}
277
278impl MemorySchemaClient {
279 pub fn read_schema(
285 &self,
286 request: &ReadSchemaRequest,
287 ) -> Result<ReadSchemaResponse, SpiceDBError> {
288 memory_transport::read_schema(self.handle, request).map_err(|e| SpiceDBError::SpiceDB(e.0))
289 }
290
291 pub fn write_schema(
297 &self,
298 request: &WriteSchemaRequest,
299 ) -> Result<WriteSchemaResponse, SpiceDBError> {
300 memory_transport::write_schema(self.handle, request).map_err(|e| SpiceDBError::SpiceDB(e.0))
301 }
302}
303
304#[cfg(test)]
305mod tests {
306
307 use std::time::Duration;
308
309 use spicedb_grpc_tonic::v1::{
310 CheckPermissionRequest, Consistency, ObjectReference, ReadRelationshipsRequest,
311 Relationship, RelationshipFilter, RelationshipUpdate, SubjectReference, WatchKind,
312 WatchRequest, WriteRelationshipsRequest, relationship_update::Operation,
313 watch_service_client::WatchServiceClient,
314 };
315 use tokio::time::timeout;
316 use tokio_stream::StreamExt;
317 use tonic::transport::{Channel, Endpoint};
318
319 use super::*;
320 use crate::v1::check_permission_response::Permissionship;
321
322 #[allow(unused_variables)] async fn connect_streaming(
325 addr: &str,
326 transport: &str,
327 ) -> Result<Channel, Box<dyn std::error::Error + Send + Sync>> {
328 #[cfg(unix)]
329 {
330 if transport == "unix" {
331 let path = addr.to_string();
332 Endpoint::try_from("http://[::]:50051")?
333 .connect_with_connector(tower::service_fn(move |_: tonic::transport::Uri| {
334 let path = path.clone();
335 async move {
336 let stream = tokio::net::UnixStream::connect(&path).await?;
337 Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream))
338 }
339 }))
340 .await
341 .map_err(Into::into)
342 } else {
343 Endpoint::from_shared(format!("http://{addr}"))?
344 .connect()
345 .await
346 .map_err(Into::into)
347 }
348 }
349 #[cfg(windows)]
350 {
351 Endpoint::from_shared(format!("http://{addr}"))?
352 .connect()
353 .await
354 .map_err(Into::into)
355 }
356 }
357
358 const TEST_SCHEMA: &str = r"
359definition user {}
360
361definition document {
362 relation reader: user
363 relation writer: user
364
365 permission read = reader + writer
366 permission write = writer
367}
368";
369
370 fn rel(resource: &str, relation: &str, subject: &str) -> Relationship {
371 let (res_type, res_id) = resource.split_once(':').unwrap();
372 let (sub_type, sub_id) = subject.split_once(':').unwrap();
373 Relationship {
374 resource: Some(ObjectReference {
375 object_type: res_type.into(),
376 object_id: res_id.into(),
377 }),
378 relation: relation.into(),
379 subject: Some(SubjectReference {
380 object: Some(ObjectReference {
381 object_type: sub_type.into(),
382 object_id: sub_id.into(),
383 }),
384 optional_relation: String::new(),
385 }),
386 optional_caveat: None,
387 optional_expires_at: None,
388 }
389 }
390
391 fn fully_consistent() -> Consistency {
392 Consistency {
393 requirement: Some(crate::v1::consistency::Requirement::FullyConsistent(true)),
394 }
395 }
396
397 fn check_req(resource: &str, permission: &str, subject: &str) -> CheckPermissionRequest {
398 let (res_type, res_id) = resource.split_once(':').unwrap();
399 let (sub_type, sub_id) = subject.split_once(':').unwrap();
400 CheckPermissionRequest {
401 consistency: Some(fully_consistent()),
402 resource: Some(ObjectReference {
403 object_type: res_type.into(),
404 object_id: res_id.into(),
405 }),
406 permission: permission.into(),
407 subject: Some(SubjectReference {
408 object: Some(ObjectReference {
409 object_type: sub_type.into(),
410 object_id: sub_id.into(),
411 }),
412 optional_relation: String::new(),
413 }),
414 context: None,
415 with_tracing: false,
416 }
417 }
418
419 #[test]
421 fn test_check_permission() {
422 let relationships = vec![
423 rel("document:readme", "reader", "user:alice"),
424 rel("document:readme", "writer", "user:bob"),
425 ];
426 let spicedb = match EmbeddedSpiceDB::new(TEST_SCHEMA, &relationships, None) {
427 Ok(db) => db,
428 Err(e) => {
429 let msg = e.to_string();
430 if msg.contains("streaming proxy")
431 || msg.contains("bind")
432 || msg.contains("operation not permitted")
433 {
434 return;
435 }
436 panic!("EmbeddedSpiceDB::new failed: {e}");
437 }
438 };
439
440 let response = spicedb
441 .permissions()
442 .check_permission(&check_req("document:readme", "read", "user:alice"))
443 .unwrap();
444 assert_eq!(
445 response.permissionship,
446 Permissionship::HasPermission as i32,
447 "alice should have read on document:readme"
448 );
449 }
450
451 #[test]
453 fn test_watch_streaming() {
454 let db = match EmbeddedSpiceDB::new(TEST_SCHEMA, &[], None) {
455 Ok(d) => d,
456 Err(e) => {
457 let msg = e.to_string();
458 if msg.contains("streaming proxy")
459 || msg.contains("bind")
460 || msg.contains("operation not permitted")
461 {
462 return;
463 }
464 panic!("EmbeddedSpiceDB::new failed: {e}");
465 }
466 };
467
468 let rt = tokio::runtime::Runtime::new().unwrap();
469 rt.block_on(async {
470 let channel = connect_streaming(db.streaming_address(), db.streaming_transport())
471 .await
472 .unwrap();
473 let mut watch_client = WatchServiceClient::new(channel);
474 let watch_req = WatchRequest {
476 optional_update_kinds: vec![
477 WatchKind::IncludeRelationshipUpdates.into(),
478 WatchKind::IncludeCheckpoints.into(),
479 ],
480 ..Default::default()
481 };
482 let mut stream =
483 match timeout(Duration::from_secs(10), watch_client.watch(watch_req)).await {
484 Ok(Ok(response)) => response.into_inner(),
485 Ok(Err(e)) => panic!("watch() failed: {e}"),
486 Err(_) => return,
487 };
488
489 let write_req = WriteRelationshipsRequest {
490 updates: vec![RelationshipUpdate {
491 operation: Operation::Touch as i32,
492 relationship: Some(rel("document:watched", "reader", "user:alice")),
493 }],
494 optional_preconditions: vec![],
495 optional_transaction_metadata: None,
496 };
497 db.permissions().write_relationships(&write_req).unwrap();
498
499 let mut received_update = false;
500 let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
501 while tokio::time::Instant::now() < deadline {
502 match timeout(Duration::from_millis(200), stream.next()).await {
503 Ok(Some(Ok(resp))) => {
504 if !resp.updates.is_empty() {
505 received_update = true;
506 break;
507 }
508 }
509 Ok(Some(Err(e))) => panic!("watch stream error: {e}"),
510 Ok(None) => break,
511 Err(_) => {}
512 }
513 }
514 assert!(
515 received_update,
516 "expected at least one Watch response with updates within 3s"
517 );
518 });
519 }
520
521 #[test]
522 fn test_ffi_spicedb() {
523 let relationships = vec![
524 rel("document:readme", "reader", "user:alice"),
525 rel("document:readme", "writer", "user:bob"),
526 ];
527
528 let spicedb = EmbeddedSpiceDB::new(TEST_SCHEMA, &relationships, None).unwrap();
529
530 assert_eq!(
531 spicedb
532 .permissions()
533 .check_permission(&check_req("document:readme", "read", "user:alice"))
534 .unwrap()
535 .permissionship,
536 Permissionship::HasPermission as i32,
537 );
538 assert_eq!(
539 spicedb
540 .permissions()
541 .check_permission(&check_req("document:readme", "write", "user:alice"))
542 .unwrap()
543 .permissionship,
544 Permissionship::NoPermission as i32,
545 );
546 assert_eq!(
547 spicedb
548 .permissions()
549 .check_permission(&check_req("document:readme", "read", "user:bob"))
550 .unwrap()
551 .permissionship,
552 Permissionship::HasPermission as i32,
553 );
554 assert_eq!(
555 spicedb
556 .permissions()
557 .check_permission(&check_req("document:readme", "write", "user:bob"))
558 .unwrap()
559 .permissionship,
560 Permissionship::HasPermission as i32,
561 );
562 assert_eq!(
563 spicedb
564 .permissions()
565 .check_permission(&check_req("document:readme", "read", "user:charlie"))
566 .unwrap()
567 .permissionship,
568 Permissionship::NoPermission as i32,
569 );
570 }
571
572 #[test]
573 fn test_add_relationship() {
574 let spicedb = EmbeddedSpiceDB::new(TEST_SCHEMA, &[], None).unwrap();
575
576 spicedb
577 .permissions()
578 .write_relationships(&WriteRelationshipsRequest {
579 updates: vec![RelationshipUpdate {
580 operation: Operation::Touch as i32,
581 relationship: Some(rel("document:test", "reader", "user:alice")),
582 }],
583 optional_preconditions: vec![],
584 optional_transaction_metadata: None,
585 })
586 .unwrap();
587
588 let r = spicedb
589 .permissions()
590 .check_permission(&check_req("document:test", "read", "user:alice"))
591 .unwrap();
592 assert_eq!(r.permissionship, Permissionship::HasPermission as i32);
593 }
594
595 #[test]
596 fn test_parallel_instances() {
597 let spicedb1 = EmbeddedSpiceDB::new(TEST_SCHEMA, &[], None).unwrap();
598 let spicedb2 = EmbeddedSpiceDB::new(TEST_SCHEMA, &[], None).unwrap();
599
600 spicedb1
601 .permissions()
602 .write_relationships(&WriteRelationshipsRequest {
603 updates: vec![RelationshipUpdate {
604 operation: Operation::Touch as i32,
605 relationship: Some(rel("document:doc1", "reader", "user:alice")),
606 }],
607 optional_preconditions: vec![],
608 optional_transaction_metadata: None,
609 })
610 .unwrap();
611
612 let r1 = spicedb1
613 .permissions()
614 .check_permission(&check_req("document:doc1", "read", "user:alice"))
615 .unwrap();
616 assert_eq!(r1.permissionship, Permissionship::HasPermission as i32);
617
618 let r2 = spicedb2
619 .permissions()
620 .check_permission(&check_req("document:doc1", "read", "user:alice"))
621 .unwrap();
622 assert_eq!(r2.permissionship, Permissionship::NoPermission as i32);
623 }
624
625 #[tokio::test]
626 async fn test_read_relationships() {
627 let relationships = vec![
628 rel("document:doc1", "reader", "user:alice"),
629 rel("document:doc1", "reader", "user:bob"),
630 ];
631
632 let spicedb = EmbeddedSpiceDB::new(TEST_SCHEMA, &relationships, None).unwrap();
633 let channel = connect_streaming(spicedb.streaming_address(), spicedb.streaming_transport())
634 .await
635 .unwrap();
636 let mut client =
637 spicedb_grpc_tonic::v1::permissions_service_client::PermissionsServiceClient::new(
638 channel,
639 );
640 let mut stream = client
641 .read_relationships(ReadRelationshipsRequest {
642 consistency: Some(fully_consistent()),
643 relationship_filter: Some(RelationshipFilter {
644 resource_type: "document".into(),
645 optional_resource_id: "doc1".into(),
646 optional_resource_id_prefix: String::new(),
647 optional_relation: String::new(),
648 optional_subject_filter: None,
649 }),
650 optional_limit: 0,
651 optional_cursor: None,
652 })
653 .await
654 .unwrap()
655 .into_inner();
656
657 let mut count = 0;
658 while let Some(Ok(_)) = stream.next().await {
659 count += 1;
660 }
661 assert_eq!(count, 2);
662 }
663
664 #[test]
666 #[ignore = "performance test - run manually with --ignored flag"]
667 fn perf_check_with_1000_relationships() {
668 const NUM_CHECKS: usize = 100;
669 use std::time::Instant;
670
671 let relationships: Vec<Relationship> = (0..1000)
673 .map(|i| {
674 rel(
675 &format!("document:doc{i}"),
676 "reader",
677 &format!("user:user{}", i % 100),
678 )
679 })
680 .collect();
681
682 println!("\n=== Performance: Check with 1000 relationships ===");
683
684 let start = Instant::now();
685 let spicedb = EmbeddedSpiceDB::new(TEST_SCHEMA, &relationships, None).unwrap();
686 println!(
687 "Instance creation with 1000 relationships: {:?}",
688 start.elapsed()
689 );
690
691 for _ in 0..10 {
693 let _ = spicedb.permissions().check_permission(&check_req(
694 "document:doc0",
695 "read",
696 "user:user0",
697 ));
698 }
699
700 let start = Instant::now();
702 for i in 0..NUM_CHECKS {
703 let doc = format!("document:doc{}", i % 1000);
704 let user = format!("user:user{}", i % 100);
705 let _ = spicedb
706 .permissions()
707 .check_permission(&check_req(&doc, "read", &user))
708 .unwrap();
709 }
710 let elapsed = start.elapsed();
711
712 let num_checks_u32 = u32::try_from(NUM_CHECKS).unwrap();
713 println!("Total time for {NUM_CHECKS} checks: {elapsed:?}");
714 println!("Average per check: {:?}", elapsed / num_checks_u32);
715 println!(
716 "Checks per second: {:.0}",
717 f64::from(num_checks_u32) / elapsed.as_secs_f64()
718 );
719 }
720
721 #[test]
723 #[ignore = "performance test - run manually with --ignored flag"]
724 fn perf_add_individual_relationships() {
725 const NUM_ADDS: usize = 50;
726 use std::time::Instant;
727
728 let spicedb = EmbeddedSpiceDB::new(TEST_SCHEMA, &[], None).unwrap();
729
730 println!("\n=== Performance: Add individual relationships ===");
731
732 let start = Instant::now();
733 for i in 0..NUM_ADDS {
734 spicedb
735 .permissions()
736 .write_relationships(&WriteRelationshipsRequest {
737 updates: vec![RelationshipUpdate {
738 operation: Operation::Touch as i32,
739 relationship: Some(rel(
740 &format!("document:perf{i}"),
741 "reader",
742 "user:alice",
743 )),
744 }],
745 optional_preconditions: vec![],
746 optional_transaction_metadata: None,
747 })
748 .unwrap();
749 }
750 let elapsed = start.elapsed();
751
752 let num_adds_u32 = u32::try_from(NUM_ADDS).unwrap();
753 println!("Total time for {NUM_ADDS} individual adds: {elapsed:?}");
754 println!("Average per add: {:?}", elapsed / num_adds_u32);
755 println!(
756 "Adds per second: {:.0}",
757 f64::from(num_adds_u32) / elapsed.as_secs_f64()
758 );
759 }
760
761 #[test]
763 #[ignore = "performance test - run manually with --ignored flag"]
764 fn perf_bulk_write_relationships() {
765 use std::time::Instant;
766
767 let spicedb = EmbeddedSpiceDB::new(TEST_SCHEMA, &[], None).unwrap();
768
769 println!("\n=== Performance: Bulk write relationships ===");
770
771 for batch_size in [5_i32, 10, 20, 50] {
773 let batch_size_u32 = u32::try_from(batch_size).unwrap();
774 let relationships: Vec<Relationship> = (0..batch_size)
775 .map(|i| {
776 rel(
777 &format!("document:bulk{batch_size}_{i}"),
778 "reader",
779 "user:alice",
780 )
781 })
782 .collect();
783
784 let start = Instant::now();
785 spicedb
786 .permissions()
787 .write_relationships(&WriteRelationshipsRequest {
788 updates: relationships
789 .iter()
790 .map(|r| RelationshipUpdate {
791 operation: Operation::Touch as i32,
792 relationship: Some(r.clone()),
793 })
794 .collect(),
795 optional_preconditions: vec![],
796 optional_transaction_metadata: None,
797 })
798 .unwrap();
799 let elapsed = start.elapsed();
800
801 println!(
802 "Batch of {} relationships: {:?} ({:?} per relationship)",
803 batch_size,
804 elapsed,
805 elapsed / batch_size_u32
806 );
807 }
808
809 println!("\n--- Comparison: 10 individual vs 10 bulk ---");
811
812 let spicedb2 = EmbeddedSpiceDB::new(TEST_SCHEMA, &[], None).unwrap();
813
814 let start = Instant::now();
815 for i in 0..10 {
816 spicedb2
817 .permissions()
818 .write_relationships(&WriteRelationshipsRequest {
819 updates: vec![RelationshipUpdate {
820 operation: Operation::Touch as i32,
821 relationship: Some(rel(
822 &format!("document:cmp_ind{i}"),
823 "reader",
824 "user:bob",
825 )),
826 }],
827 optional_preconditions: vec![],
828 optional_transaction_metadata: None,
829 })
830 .unwrap();
831 }
832 let individual_time = start.elapsed();
833 println!("10 individual adds: {individual_time:?}");
834
835 let relationships: Vec<Relationship> = (0..10)
836 .map(|i| rel(&format!("document:cmp_bulk{i}"), "reader", "user:bob"))
837 .collect();
838 let start = Instant::now();
839 spicedb2
840 .permissions()
841 .write_relationships(&WriteRelationshipsRequest {
842 updates: relationships
843 .iter()
844 .map(|r| RelationshipUpdate {
845 operation: Operation::Touch as i32,
846 relationship: Some(r.clone()),
847 })
848 .collect(),
849 optional_preconditions: vec![],
850 optional_transaction_metadata: None,
851 })
852 .unwrap();
853 let bulk_time = start.elapsed();
854 println!("10 bulk add: {bulk_time:?}");
855 println!(
856 "Speedup: {:.1}x",
857 individual_time.as_secs_f64() / bulk_time.as_secs_f64()
858 );
859 }
860
861 #[test]
863 #[ignore = "performance test - run manually with --ignored flag"]
864 fn perf_embedded_50k_relationships() {
865 const TOTAL_RELS: usize = 50_000;
866 const BATCH_SIZE: usize = 1000;
867 const NUM_CHECKS: usize = 500;
868 use std::time::Instant;
869
870 println!("\n=== Embedded SpiceDB: 50,000 relationships ===");
871
872 println!("Creating instance...");
874 let start = Instant::now();
875 let spicedb = EmbeddedSpiceDB::new(TEST_SCHEMA, &[], None).unwrap();
876 println!("Instance creation time: {:?}", start.elapsed());
877
878 println!("Adding {TOTAL_RELS} relationships in batches of {BATCH_SIZE}...");
879 let start = Instant::now();
880 for batch_num in 0..(TOTAL_RELS / BATCH_SIZE) {
881 let batch_start = batch_num * BATCH_SIZE;
882 let relationships: Vec<Relationship> = (batch_start..batch_start + BATCH_SIZE)
883 .map(|i| {
884 rel(
885 &format!("document:doc{i}"),
886 "reader",
887 &format!("user:user{}", i % 1000),
888 )
889 })
890 .collect();
891 spicedb
892 .permissions()
893 .write_relationships(&WriteRelationshipsRequest {
894 updates: relationships
895 .iter()
896 .map(|r| RelationshipUpdate {
897 operation: Operation::Touch as i32,
898 relationship: Some(r.clone()),
899 })
900 .collect(),
901 optional_preconditions: vec![],
902 optional_transaction_metadata: None,
903 })
904 .unwrap();
905 }
906 println!(
907 "Total time to add {} relationships: {:?}",
908 TOTAL_RELS,
909 start.elapsed()
910 );
911
912 for _ in 0..20 {
914 let _ = spicedb.permissions().check_permission(&check_req(
915 "document:doc0",
916 "read",
917 "user:user0",
918 ));
919 }
920
921 let num_checks_u32 = u32::try_from(NUM_CHECKS).unwrap();
923 let start = Instant::now();
924 for i in 0..NUM_CHECKS {
925 let doc = format!("document:doc{}", i % TOTAL_RELS);
926 let user = format!("user:user{}", i % 1000);
927 let _ = spicedb
928 .permissions()
929 .check_permission(&check_req(&doc, "read", &user))
930 .unwrap();
931 }
932 let elapsed = start.elapsed();
933
934 println!("Total time for {NUM_CHECKS} checks: {elapsed:?}");
935 println!("Average per check: {:?}", elapsed / num_checks_u32);
936 println!(
937 "Checks per second: {:.0}",
938 f64::from(num_checks_u32) / elapsed.as_secs_f64()
939 );
940
941 let start = Instant::now();
943 for i in 0..NUM_CHECKS {
944 let doc = format!("document:doc{}", i % TOTAL_RELS);
945 let _ = spicedb
947 .permissions()
948 .check_permission(&check_req(&doc, "read", "user:nonexistent"))
949 .unwrap();
950 }
951 let elapsed = start.elapsed();
952 println!("\nNegative checks (user not found):");
953 println!("Average per check: {:?}", elapsed / num_checks_u32);
954 }
955
956 #[cfg(all(not(target_os = "windows"), target_arch = "x86_64"))]
962 mod datastore_shared {
963 const LINUX_AMD64: &str = "linux/amd64";
965 use std::process::Command;
966
967 use testcontainers_modules::{
968 cockroach_db, mysql, postgres,
969 testcontainers::{
970 GenericImage, ImageExt,
971 core::{IntoContainerPort, WaitFor},
972 runners::AsyncRunner,
973 },
974 };
975
976 use super::*;
977 use crate::StartOptions;
978
979 fn run_migrate(engine: &str, uri: &str, extra_args: &[(&str, &str)]) -> Result<(), String> {
982 let mut cmd = Command::new("spicedb");
983 cmd.args([
984 "datastore",
985 "migrate",
986 "head",
987 "--datastore-engine",
988 engine,
989 "--datastore-conn-uri",
990 uri,
991 ]);
992 for (k, v) in extra_args {
993 cmd.arg(format!("--{k}={v}"));
994 }
995 let output = cmd
996 .output()
997 .map_err(|e| format!("spicedb migrate failed (is spicedb in PATH?): {e}"))?;
998 if output.status.success() {
999 Ok(())
1000 } else {
1001 let stderr = String::from_utf8_lossy(&output.stderr);
1002 Err(format!("spicedb migrate failed: {stderr}"))
1003 }
1004 }
1005
1006 fn run_shared_datastore_test(datastore: &str, datastore_uri: &str) {
1008 run_migrate(datastore, datastore_uri, &[]).expect("migration must succeed");
1009
1010 let opts = StartOptions {
1011 datastore: Some(datastore.into()),
1012 datastore_uri: Some(datastore_uri.into()),
1013 ..Default::default()
1014 };
1015
1016 let schema = TEST_SCHEMA;
1017 let db1 = EmbeddedSpiceDB::new(schema, &[], Some(&opts)).unwrap();
1018 let db2 = EmbeddedSpiceDB::new(schema, &[], Some(&opts)).unwrap();
1019
1020 db1.permissions()
1022 .write_relationships(&WriteRelationshipsRequest {
1023 updates: vec![RelationshipUpdate {
1024 operation: Operation::Touch as i32,
1025 relationship: Some(rel("document:shared", "reader", "user:alice")),
1026 }],
1027 optional_preconditions: vec![],
1028 optional_transaction_metadata: None,
1029 })
1030 .unwrap();
1031
1032 let r = db2
1034 .permissions()
1035 .check_permission(&check_req("document:shared", "read", "user:alice"))
1036 .unwrap();
1037 assert_eq!(r.permissionship, Permissionship::HasPermission as i32);
1038 }
1039
1040 #[tokio::test]
1041 async fn datastore_shared_postgres() {
1042 let container = postgres::Postgres::default()
1044 .with_tag("17")
1045 .with_platform(LINUX_AMD64)
1046 .start()
1047 .await
1048 .unwrap();
1049 let host = container.get_host().await.unwrap();
1050 let port = container.get_host_port_ipv4(5432).await.unwrap();
1051 let uri = format!("postgres://postgres:postgres@{host}:{port}/postgres");
1052 run_shared_datastore_test("postgres", &uri);
1053 }
1054
1055 #[tokio::test]
1056 async fn datastore_shared_cockroachdb() {
1057 let container = cockroach_db::CockroachDb::default()
1058 .with_platform(LINUX_AMD64)
1059 .start()
1060 .await
1061 .unwrap();
1062 let host = container.get_host().await.unwrap();
1063 let port = container.get_host_port_ipv4(26257).await.unwrap();
1064 let uri = format!("postgres://root@{host}:{port}/defaultdb?sslmode=disable");
1065 run_shared_datastore_test("cockroachdb", &uri);
1066 }
1067
1068 #[tokio::test]
1069 async fn datastore_shared_mysql() {
1070 let container = mysql::Mysql::default()
1071 .with_platform(LINUX_AMD64)
1072 .start()
1073 .await
1074 .unwrap();
1075 let host = container.get_host().await.unwrap();
1076 let port = container.get_host_port_ipv4(3306).await.unwrap();
1077 let uri = format!("root@tcp({host}:{port})/test?parseTime=true");
1079 run_shared_datastore_test("mysql", &uri);
1080 }
1081
1082 #[tokio::test]
1083 async fn datastore_shared_spanner() {
1084 let container = GenericImage::new("roryq/spanner-emulator", "latest")
1087 .with_exposed_port(9010u16.tcp())
1088 .with_wait_for(WaitFor::seconds(5))
1089 .with_platform(LINUX_AMD64)
1090 .with_env_var("SPANNER_PROJECT_ID", "test-project")
1091 .with_env_var("SPANNER_INSTANCE_ID", "test-instance")
1092 .with_env_var("SPANNER_DATABASE_ID", "test-db")
1093 .start()
1094 .await
1095 .unwrap();
1096 let host = container.get_host().await.unwrap();
1097 let port = container.get_host_port_ipv4(9010u16.tcp()).await.unwrap();
1098 let emulator_host = format!("{host}:{port}");
1099 let uri = "projects/test-project/instances/test-instance/databases/test-db".to_string();
1100
1101 run_migrate(
1102 "spanner",
1103 &uri,
1104 &[("datastore-spanner-emulator-host", &emulator_host)],
1105 )
1106 .expect("migration must succeed");
1107
1108 let opts = StartOptions {
1109 datastore: Some("spanner".into()),
1110 datastore_uri: Some(uri),
1111 spanner_emulator_host: Some(emulator_host),
1112 ..Default::default()
1113 };
1114
1115 let schema = TEST_SCHEMA;
1116 let db1 = EmbeddedSpiceDB::new(schema, &[], Some(&opts)).unwrap();
1117 let db2 = EmbeddedSpiceDB::new(schema, &[], Some(&opts)).unwrap();
1118
1119 db1.permissions()
1120 .write_relationships(&WriteRelationshipsRequest {
1121 updates: vec![RelationshipUpdate {
1122 operation: Operation::Touch as i32,
1123 relationship: Some(rel("document:shared", "reader", "user:alice")),
1124 }],
1125 optional_preconditions: vec![],
1126 optional_transaction_metadata: None,
1127 })
1128 .unwrap();
1129
1130 let r = db2
1131 .permissions()
1132 .check_permission(&check_req("document:shared", "read", "user:alice"))
1133 .unwrap();
1134 assert_eq!(r.permissionship, Permissionship::HasPermission as i32);
1135 }
1136 }
1137}