use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use query_router::{PaginationOptions, QueryResult, QueryRouter};
use crate::audit::{AuditEvent, AuditLogger};
use crate::auth;
use crate::config::AuthConfig;
use crate::convert::{
edge_to_proto, node_to_proto, paged_query_result_to_proto, query_result_to_proto, row_to_proto,
similar_to_proto, spatial_to_proto,
};
use crate::metrics::ServerMetrics;
use crate::proto::{
self, query_service_server::QueryService, BatchQueryRequest, BatchQueryResponse,
CloseCursorRequest, CloseCursorResponse, PaginatedQueryRequest, PaginatedQueryResponse,
QueryRequest, QueryResponse, QueryResponseChunk,
};
use crate::rate_limit::{Operation, RateLimiter};
use crate::service::health::HealthState;
const DEFAULT_STREAM_CHANNEL_CAPACITY: usize = 32;
const FAILURE_THRESHOLD: u32 = 5;
const MAX_BATCH_SIZE: usize = 100;
pub struct QueryServiceImpl {
router: Arc<RwLock<QueryRouter>>,
auth_config: Option<AuthConfig>,
stream_channel_capacity: usize,
health_state: Option<Arc<HealthState>>,
consecutive_failures: AtomicU32,
rate_limiter: Option<Arc<RateLimiter>>,
audit_logger: Option<Arc<AuditLogger>>,
metrics: Option<Arc<ServerMetrics>>,
}
impl QueryServiceImpl {
#[must_use]
pub const fn new(router: Arc<RwLock<QueryRouter>>) -> Self {
Self {
router,
auth_config: None,
stream_channel_capacity: DEFAULT_STREAM_CHANNEL_CAPACITY,
health_state: None,
consecutive_failures: AtomicU32::new(0),
rate_limiter: None,
audit_logger: None,
metrics: None,
}
}
#[must_use]
pub const fn with_auth(router: Arc<RwLock<QueryRouter>>, auth_config: AuthConfig) -> Self {
Self {
router,
auth_config: Some(auth_config),
stream_channel_capacity: DEFAULT_STREAM_CHANNEL_CAPACITY,
health_state: None,
consecutive_failures: AtomicU32::new(0),
rate_limiter: None,
audit_logger: None,
metrics: None,
}
}
#[must_use]
pub const fn with_config(
router: Arc<RwLock<QueryRouter>>,
auth_config: Option<AuthConfig>,
stream_channel_capacity: usize,
) -> Self {
Self {
router,
auth_config,
stream_channel_capacity,
health_state: None,
consecutive_failures: AtomicU32::new(0),
rate_limiter: None,
audit_logger: None,
metrics: None,
}
}
#[must_use]
pub const fn with_health_state(
router: Arc<RwLock<QueryRouter>>,
auth_config: Option<AuthConfig>,
stream_channel_capacity: usize,
health_state: Arc<HealthState>,
) -> Self {
Self {
router,
auth_config,
stream_channel_capacity,
health_state: Some(health_state),
consecutive_failures: AtomicU32::new(0),
rate_limiter: None,
audit_logger: None,
metrics: None,
}
}
#[must_use]
pub const fn with_full_config(
router: Arc<RwLock<QueryRouter>>,
auth_config: Option<AuthConfig>,
stream_channel_capacity: usize,
health_state: Arc<HealthState>,
rate_limiter: Option<Arc<RateLimiter>>,
audit_logger: Option<Arc<AuditLogger>>,
metrics: Option<Arc<ServerMetrics>>,
) -> Self {
Self {
router,
auth_config,
stream_channel_capacity,
health_state: Some(health_state),
consecutive_failures: AtomicU32::new(0),
rate_limiter,
audit_logger,
metrics,
}
}
fn record_success(&self) {
self.consecutive_failures.store(0, Ordering::SeqCst);
if let Some(ref health) = self.health_state {
health.set_query_service_healthy(true);
}
}
fn record_failure(&self) {
let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
if failures >= FAILURE_THRESHOLD {
if let Some(ref health) = self.health_state {
health.set_query_service_healthy(false);
tracing::warn!(
"Query service marked unhealthy after {} consecutive failures",
failures
);
}
}
}
fn execute_query(&self, query: &str, identity: Option<&str>) -> Result<QueryResult, Status> {
if let Some(id) = identity {
let mut router = self.router.write();
router.set_identity(id);
match router.execute(query) {
Ok(result) => {
self.record_success();
return Ok(result);
},
Err(e) => {
self.record_failure();
tracing::error!("Query execution error: {}", e);
return Err(crate::error::sanitize_internal_error(e));
},
}
}
let router = self.router.read();
match router.execute(query) {
Ok(result) => {
self.record_success();
Ok(result)
},
Err(e) => {
self.record_failure();
tracing::error!("Query execution error: {}", e);
Err(crate::error::sanitize_internal_error(e))
},
}
}
}
#[allow(clippy::too_many_lines)]
#[tonic::async_trait]
impl QueryService for QueryServiceImpl {
async fn execute(
&self,
request: Request<QueryRequest>,
) -> Result<Response<QueryResponse>, Status> {
let start = Instant::now();
let identity = match auth::validate_request_with_audit(
&request,
&self.auth_config,
self.rate_limiter.as_deref(),
self.audit_logger.as_deref(),
) {
Ok(id) => id,
Err(status) => {
if let Some(ref m) = self.metrics {
if status.code() == tonic::Code::Unauthenticated {
m.record_auth_failure("invalid_key");
}
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute", false, latency_ms);
}
return Err(status);
},
};
if let Some(ref limiter) = self.rate_limiter {
if let Some(ref id) = identity {
if let Err(msg) = limiter.check_and_record(id, Operation::Query) {
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::RateLimited {
identity: id.clone(),
operation: "query".to_string(),
},
None,
);
}
if let Some(ref m) = self.metrics {
m.record_rate_limited(id, "query");
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute", false, latency_ms);
}
return Err(Status::resource_exhausted(msg));
}
}
}
let query = &request.get_ref().query;
tracing::debug!("Executing query: {}", query);
let result = self.execute_query(query, identity.as_deref());
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute", result.is_ok(), latency_ms);
}
if let Some(ref logger) = self.audit_logger {
if logger.config().log_queries {
logger.record(
AuditEvent::QueryExecuted {
identity,
query: query.clone(),
},
None,
);
}
}
match result {
Ok(result) => Ok(Response::new(query_result_to_proto(result))),
Err(status) => Err(status),
}
}
type ExecuteStreamStream =
Pin<Box<dyn tokio_stream::Stream<Item = Result<QueryResponseChunk, Status>> + Send>>;
async fn execute_stream(
&self,
request: Request<QueryRequest>,
) -> Result<Response<Self::ExecuteStreamStream>, Status> {
let start = Instant::now();
let identity = match auth::validate_request_with_audit(
&request,
&self.auth_config,
self.rate_limiter.as_deref(),
self.audit_logger.as_deref(),
) {
Ok(id) => id,
Err(status) => {
if let Some(ref m) = self.metrics {
if status.code() == tonic::Code::Unauthenticated {
m.record_auth_failure("invalid_key");
}
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute_stream", false, latency_ms);
}
return Err(status);
},
};
if let Some(ref limiter) = self.rate_limiter {
if let Some(ref id) = identity {
if let Err(msg) = limiter.check_and_record(id, Operation::Query) {
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::RateLimited {
identity: id.clone(),
operation: "query".to_string(),
},
None,
);
}
if let Some(ref m) = self.metrics {
m.record_rate_limited(id, "query");
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute_stream", false, latency_ms);
}
return Err(Status::resource_exhausted(msg));
}
}
}
let query = &request.get_ref().query;
tracing::debug!("Executing streaming query: {}", query);
if let Some(ref logger) = self.audit_logger {
if logger.config().log_queries {
logger.record(
AuditEvent::QueryExecuted {
identity: identity.clone(),
query: query.clone(),
},
None,
);
}
}
let result = match self.execute_query(query, identity.as_deref()) {
Ok(r) => r,
Err(status) => {
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute_stream", false, latency_ms);
}
return Err(status);
},
};
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute_stream", true, latency_ms);
}
let (tx, rx) = mpsc::channel(self.stream_channel_capacity);
tokio::spawn(async move {
let send_result = match result {
QueryResult::Rows(rows) => {
for row in rows {
let chunk = QueryResponseChunk {
chunk: Some(proto::query_response_chunk::Chunk::Row(proto::RowChunk {
row: Some(row_to_proto(row)),
})),
is_final: false,
sequence_number: None,
};
if tx.send(Ok(chunk)).await.is_err() {
return;
}
}
true
},
QueryResult::Nodes(nodes) => {
for node in nodes {
let chunk = QueryResponseChunk {
chunk: Some(proto::query_response_chunk::Chunk::Node(
proto::NodeChunk {
node: Some(node_to_proto(node)),
},
)),
is_final: false,
sequence_number: None,
};
if tx.send(Ok(chunk)).await.is_err() {
return;
}
}
true
},
QueryResult::Edges(edges) => {
for edge in edges {
let chunk = QueryResponseChunk {
chunk: Some(proto::query_response_chunk::Chunk::Edge(
proto::EdgeChunk {
edge: Some(edge_to_proto(edge)),
},
)),
is_final: false,
sequence_number: None,
};
if tx.send(Ok(chunk)).await.is_err() {
return;
}
}
true
},
QueryResult::Similar(items) => {
for item in items {
let chunk = QueryResponseChunk {
chunk: Some(proto::query_response_chunk::Chunk::SimilarItem(
proto::SimilarChunk {
item: Some(similar_to_proto(item)),
},
)),
is_final: false,
sequence_number: None,
};
if tx.send(Ok(chunk)).await.is_err() {
return;
}
}
true
},
QueryResult::Blob(data) => {
for chunk_data in data.chunks(64 * 1024) {
let chunk = QueryResponseChunk {
chunk: Some(proto::query_response_chunk::Chunk::BlobData(
chunk_data.to_vec(),
)),
is_final: false,
sequence_number: None,
};
if tx.send(Ok(chunk)).await.is_err() {
return;
}
}
true
},
QueryResult::Spatial(items) => {
for item in items {
let chunk = QueryResponseChunk {
chunk: Some(proto::query_response_chunk::Chunk::SpatialItem(
proto::SpatialChunk {
item: Some(spatial_to_proto(item)),
},
)),
is_final: false,
sequence_number: None,
};
if tx.send(Ok(chunk)).await.is_err() {
return;
}
}
true
},
_ => {
let chunk = QueryResponseChunk {
chunk: Some(proto::query_response_chunk::Chunk::Error(
proto::ErrorInfo {
code: proto::ErrorCode::InvalidArgument.into(),
message: "Result type not supported for streaming".to_string(),
details: None,
},
)),
is_final: true,
sequence_number: None,
};
let _ = tx.send(Ok(chunk)).await;
false
},
};
if send_result {
let final_chunk = QueryResponseChunk {
chunk: None,
is_final: true,
sequence_number: None,
};
let _ = tx.send(Ok(final_chunk)).await;
}
});
let stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream)))
}
async fn execute_batch(
&self,
request: Request<BatchQueryRequest>,
) -> Result<Response<BatchQueryResponse>, Status> {
let start = Instant::now();
let request_identity = match auth::validate_request_with_audit(
&request,
&self.auth_config,
self.rate_limiter.as_deref(),
self.audit_logger.as_deref(),
) {
Ok(id) => id,
Err(status) => {
if let Some(ref m) = self.metrics {
if status.code() == tonic::Code::Unauthenticated {
m.record_auth_failure("invalid_key");
}
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute_batch", false, latency_ms);
}
return Err(status);
},
};
let batch = request.into_inner();
if batch.queries.len() > MAX_BATCH_SIZE {
return Err(Status::invalid_argument(format!(
"Batch size {} exceeds maximum of {MAX_BATCH_SIZE}",
batch.queries.len()
)));
}
let mut results = Vec::with_capacity(batch.queries.len());
let mut all_succeeded = true;
for query_request in batch.queries {
let identity = request_identity.clone();
if let Some(ref limiter) = self.rate_limiter {
if let Some(ref id) = identity {
if let Err(msg) = limiter.check_and_record(id, Operation::Query) {
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::RateLimited {
identity: id.clone(),
operation: "query".to_string(),
},
None,
);
}
if let Some(ref m) = self.metrics {
m.record_rate_limited(id, "query");
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute_batch", false, latency_ms);
}
return Err(Status::resource_exhausted(msg));
}
}
}
if let Some(ref logger) = self.audit_logger {
if logger.config().log_queries {
logger.record(
AuditEvent::QueryExecuted {
identity: identity.clone(),
query: query_request.query.clone(),
},
None,
);
}
}
let response = match self.execute_query(&query_request.query, identity.as_deref()) {
Ok(result) => query_result_to_proto(result),
Err(status) => {
all_succeeded = false;
QueryResponse {
result: None,
error: Some(proto::ErrorInfo {
code: status_to_error_code(&status).into(),
message: status.message().to_string(),
details: None,
}),
}
},
};
results.push(response);
}
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute_batch", all_succeeded, latency_ms);
}
Ok(Response::new(BatchQueryResponse { results }))
}
async fn execute_paginated(
&self,
request: Request<PaginatedQueryRequest>,
) -> Result<Response<PaginatedQueryResponse>, Status> {
let start = Instant::now();
let identity = match auth::validate_request_with_audit(
&request,
&self.auth_config,
self.rate_limiter.as_deref(),
self.audit_logger.as_deref(),
) {
Ok(id) => id,
Err(status) => {
if let Some(ref m) = self.metrics {
if status.code() == tonic::Code::Unauthenticated {
m.record_auth_failure("invalid_key");
}
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute_paginated", false, latency_ms);
}
return Err(status);
},
};
if let Some(ref limiter) = self.rate_limiter {
if let Some(ref id) = identity {
if let Err(msg) = limiter.check_and_record(id, Operation::Query) {
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::RateLimited {
identity: id.clone(),
operation: "query".to_string(),
},
None,
);
}
if let Some(ref m) = self.metrics {
m.record_rate_limited(id, "query");
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute_paginated", false, latency_ms);
}
return Err(Status::resource_exhausted(msg));
}
}
}
let req = request.get_ref();
let query = &req.query;
tracing::debug!("Executing paginated query: {}", query);
let options = PaginationOptions {
cursor: req.cursor.clone(),
page_size: req.page_size.map(|s| s as usize),
count_total: req.count_total.unwrap_or(false),
cursor_ttl: req
.cursor_ttl_secs
.map(|s| Duration::from_secs(u64::from(s))),
};
if let Some(ref logger) = self.audit_logger {
if logger.config().log_queries {
logger.record(
AuditEvent::QueryExecuted {
identity: identity.clone(),
query: query.clone(),
},
None,
);
}
}
let result = {
let mut router = self.router.write();
if let Some(id) = identity.as_deref() {
router.set_identity(id);
}
router.execute_paginated(query, options)
};
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "execute_paginated", result.is_ok(), latency_ms);
}
match result {
Ok(paged_result) => {
self.record_success();
Ok(Response::new(paged_query_result_to_proto(paged_result)))
},
Err(e) => {
self.record_failure();
tracing::error!("Paginated query execution error: {}", e);
Err(crate::error::sanitize_internal_error(e))
},
}
}
async fn close_cursor(
&self,
request: Request<CloseCursorRequest>,
) -> Result<Response<CloseCursorResponse>, Status> {
let start = Instant::now();
let identity = match auth::validate_request_with_audit(
&request,
&self.auth_config,
self.rate_limiter.as_deref(),
self.audit_logger.as_deref(),
) {
Ok(id) => id,
Err(status) => {
if let Some(ref m) = self.metrics {
if status.code() == tonic::Code::Unauthenticated {
m.record_auth_failure("invalid_key");
}
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "close_cursor", false, latency_ms);
}
return Err(status);
},
};
let cursor = &request.get_ref().cursor;
tracing::debug!("Closing cursor: {}...", &cursor[..cursor.len().min(20)]);
let result = {
let router = self.router.read();
router.close_cursor(cursor)
};
if let Some(ref logger) = self.audit_logger {
logger.record(
AuditEvent::QueryExecuted {
identity,
query: format!("CLOSE CURSOR {}...", &cursor[..cursor.len().min(20)]),
},
None,
);
}
if let Some(ref m) = self.metrics {
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
m.record_request("query", "close_cursor", result.is_ok(), latency_ms);
}
match result {
Ok(success) => Ok(Response::new(CloseCursorResponse { success })),
Err(e) => {
tracing::warn!("Cursor close error: {}", e);
Ok(Response::new(CloseCursorResponse { success: false }))
},
}
}
}
fn status_to_error_code(status: &Status) -> proto::ErrorCode {
match status.code() {
tonic::Code::InvalidArgument => proto::ErrorCode::InvalidArgument,
tonic::Code::NotFound => proto::ErrorCode::NotFound,
tonic::Code::PermissionDenied => proto::ErrorCode::PermissionDenied,
tonic::Code::AlreadyExists => proto::ErrorCode::AlreadyExists,
tonic::Code::Unauthenticated => proto::ErrorCode::Unauthenticated,
tonic::Code::Unavailable => proto::ErrorCode::Unavailable,
_ => proto::ErrorCode::Internal,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_router() -> Arc<RwLock<QueryRouter>> {
Arc::new(RwLock::new(QueryRouter::new()))
}
#[tokio::test]
async fn test_execute_create_table() {
let router = create_test_router();
let service = QueryServiceImpl::new(router);
let request = Request::new(QueryRequest {
query: "CREATE TABLE users (name text, age int)".to_string(),
identity: None,
});
let response = service.execute(request).await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_execute_invalid_query() {
let router = create_test_router();
let service = QueryServiceImpl::new(router);
let request = Request::new(QueryRequest {
query: "INVALID QUERY SYNTAX!!!".to_string(),
identity: None,
});
let response = service.execute(request).await;
assert!(response.is_err());
}
#[tokio::test]
async fn test_execute_select() {
let router = create_test_router();
{
let r = router.write();
r.execute("CREATE TABLE users (name text, age int)")
.unwrap();
r.execute("INSERT INTO users (name, age) VALUES ('Alice', 30)")
.unwrap();
}
let service = QueryServiceImpl::new(router);
let request = Request::new(QueryRequest {
query: "SELECT * FROM users".to_string(),
identity: None,
});
let response = service.execute(request).await.unwrap();
let inner = response.into_inner();
assert!(matches!(
inner.result,
Some(proto::query_response::Result::Rows(_))
));
}
#[tokio::test]
async fn test_execute_batch() {
let router = create_test_router();
let service = QueryServiceImpl::new(router);
let request = Request::new(BatchQueryRequest {
queries: vec![
QueryRequest {
query: "CREATE TABLE batch_test (x int)".to_string(),
identity: None,
},
QueryRequest {
query: "INSERT INTO batch_test (x) VALUES (1)".to_string(),
identity: None,
},
QueryRequest {
query: "SELECT * FROM batch_test".to_string(),
identity: None,
},
],
});
let response = service.execute_batch(request).await.unwrap();
let inner = response.into_inner();
assert_eq!(inner.results.len(), 3);
}
#[tokio::test]
async fn test_execute_batch_with_auth() {
use crate::config::ApiKey;
use tonic::metadata::MetadataValue;
let router = create_test_router();
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"test-api-key-12345678".to_string(),
"user:alice".to_string(),
))
.with_anonymous(false);
let service = QueryServiceImpl::with_auth(router, auth_config);
let request = Request::new(BatchQueryRequest {
queries: vec![QueryRequest {
query: "CREATE TABLE batch_auth (x int)".to_string(),
identity: None,
}],
});
let response = service.execute_batch(request).await;
assert!(response.is_err());
assert_eq!(response.unwrap_err().code(), tonic::Code::Unauthenticated);
let mut request = Request::new(BatchQueryRequest {
queries: vec![QueryRequest {
query: "CREATE TABLE batch_auth (x int)".to_string(),
identity: None,
}],
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::try_from("test-api-key-12345678").expect("valid metadata value"),
);
let response = service.execute_batch(request).await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_execute_batch_ignores_query_identity() {
use crate::config::ApiKey;
use tonic::metadata::MetadataValue;
let router = create_test_router();
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"test-api-key-12345678".to_string(),
"user:alice".to_string(),
))
.with_anonymous(false);
let service = QueryServiceImpl::with_auth(router, auth_config);
let mut request = Request::new(BatchQueryRequest {
queries: vec![QueryRequest {
query: "CREATE TABLE priv_test (x int)".to_string(),
identity: Some("user:evil".to_string()), }],
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::try_from("test-api-key-12345678").expect("valid metadata value"),
);
let response = service.execute_batch(request).await;
assert!(response.is_ok(), "Batch execution should succeed");
}
#[tokio::test]
async fn test_status_to_error_code() {
assert_eq!(
status_to_error_code(&Status::invalid_argument("test")),
proto::ErrorCode::InvalidArgument
);
assert_eq!(
status_to_error_code(&Status::not_found("test")),
proto::ErrorCode::NotFound
);
assert_eq!(
status_to_error_code(&Status::permission_denied("test")),
proto::ErrorCode::PermissionDenied
);
assert_eq!(
status_to_error_code(&Status::internal("test")),
proto::ErrorCode::Internal
);
}
#[tokio::test]
async fn test_health_state_on_failures() {
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let service =
QueryServiceImpl::with_health_state(router, None, 32, Arc::clone(&health_state));
assert!(health_state.is_query_service_healthy());
let request = Request::new(QueryRequest {
query: "CREATE TABLE health_test (x int)".to_string(),
identity: None,
});
let _ = service.execute(request).await;
assert!(health_state.is_query_service_healthy());
for _ in 0..FAILURE_THRESHOLD {
let request = Request::new(QueryRequest {
query: "INVALID QUERY!!!".to_string(),
identity: None,
});
let _ = service.execute(request).await;
}
assert!(!health_state.is_query_service_healthy());
let request = Request::new(QueryRequest {
query: "SELECT * FROM health_test".to_string(),
identity: None,
});
let _ = service.execute(request).await;
assert!(health_state.is_query_service_healthy());
}
#[test]
fn test_failure_tracking() {
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let service =
QueryServiceImpl::with_health_state(router, None, 32, Arc::clone(&health_state));
for i in 0..FAILURE_THRESHOLD {
service.record_failure();
if i < FAILURE_THRESHOLD - 1 {
assert!(
health_state.is_query_service_healthy(),
"Should be healthy before threshold"
);
}
}
assert!(!health_state.is_query_service_healthy());
service.record_success();
assert!(health_state.is_query_service_healthy());
}
#[tokio::test]
async fn test_execute_with_identity() {
let router = create_test_router();
let service = QueryServiceImpl::new(router);
let request = Request::new(QueryRequest {
query: "CREATE TABLE id_test (x int)".to_string(),
identity: Some("test-user".to_string()),
});
let response = service.execute(request).await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_execute_stream_rows() {
let router = create_test_router();
{
let r = router.write();
r.execute("CREATE TABLE stream_test (name text, age int)")
.unwrap();
r.execute("INSERT INTO stream_test (name, age) VALUES ('Alice', 30)")
.unwrap();
r.execute("INSERT INTO stream_test (name, age) VALUES ('Bob', 25)")
.unwrap();
}
let service = QueryServiceImpl::with_config(router, None, 10);
let request = Request::new(QueryRequest {
query: "SELECT * FROM stream_test".to_string(),
identity: None,
});
let response = service.execute_stream(request).await.unwrap();
let mut stream = response.into_inner();
let mut chunks = vec![];
while let Some(chunk) = stream.next().await {
chunks.push(chunk.unwrap());
}
assert!(chunks.len() >= 2, "Expected at least 2 chunks");
assert!(chunks.last().unwrap().is_final);
}
#[tokio::test]
async fn test_execute_stream_non_streaming_result() {
let router = create_test_router();
let service = QueryServiceImpl::new(router);
let request = Request::new(QueryRequest {
query: "CREATE TABLE stream_empty (x int)".to_string(),
identity: None,
});
let response = service.execute_stream(request).await.unwrap();
let mut stream = response.into_inner();
let mut found_final = false;
while let Some(chunk) = stream.next().await {
let c = chunk.unwrap();
if c.is_final {
found_final = true;
break;
}
}
assert!(found_final, "Should have final marker");
}
#[tokio::test]
async fn test_execute_with_auth_config() {
use crate::config::ApiKey;
use tonic::metadata::MetadataValue;
let router = create_test_router();
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"test-key-12345678".to_string(),
"user:alice".to_string(),
))
.with_anonymous(false);
let service = QueryServiceImpl::with_auth(router, auth_config);
let request = Request::new(QueryRequest {
query: "CREATE TABLE auth_test (x int)".to_string(),
identity: None,
});
let response = service.execute(request).await;
assert!(response.is_err());
assert_eq!(response.unwrap_err().code(), tonic::Code::Unauthenticated);
let mut request = Request::new(QueryRequest {
query: "CREATE TABLE auth_test (x int)".to_string(),
identity: None,
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::try_from("test-key-12345678").expect("valid metadata value"),
);
let response = service.execute(request).await;
assert!(response.is_ok());
}
#[test]
fn test_status_to_error_code_additional() {
assert_eq!(
status_to_error_code(&Status::already_exists("test")),
proto::ErrorCode::AlreadyExists
);
assert_eq!(
status_to_error_code(&Status::unauthenticated("test")),
proto::ErrorCode::Unauthenticated
);
assert_eq!(
status_to_error_code(&Status::unavailable("test")),
proto::ErrorCode::Unavailable
);
assert_eq!(
status_to_error_code(&Status::cancelled("test")),
proto::ErrorCode::Internal
);
assert_eq!(
status_to_error_code(&Status::aborted("test")),
proto::ErrorCode::Internal
);
}
#[test]
fn test_query_service_constructors() {
let router = create_test_router();
let service = QueryServiceImpl::new(Arc::clone(&router));
assert!(service.auth_config.is_none());
assert!(service.health_state.is_none());
let service = QueryServiceImpl::with_config(Arc::clone(&router), None, 64);
assert_eq!(service.stream_channel_capacity, 64);
let health = Arc::new(HealthState::new());
let service = QueryServiceImpl::with_health_state(router, None, 32, Arc::clone(&health));
assert!(service.health_state.is_some());
}
#[tokio::test]
async fn test_execute_stream_invalid_query() {
let router = create_test_router();
let service = QueryServiceImpl::new(router);
let request = Request::new(QueryRequest {
query: "INVALID QUERY!!!".to_string(),
identity: None,
});
let response = service.execute_stream(request).await;
assert!(response.is_err());
}
#[tokio::test]
async fn test_record_success_without_health_state() {
let router = create_test_router();
let service = QueryServiceImpl::new(router);
service.record_success();
}
#[tokio::test]
async fn test_record_failure_without_health_state() {
let router = create_test_router();
let service = QueryServiceImpl::new(router);
for _ in 0..10 {
service.record_failure();
}
}
#[tokio::test]
async fn test_execute_records_metrics() {
use crate::metrics::ServerMetrics;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let service = QueryServiceImpl::with_full_config(
router,
None,
32,
health_state,
None,
None,
Some(Arc::clone(&metrics)),
);
let request = Request::new(QueryRequest {
query: "CREATE TABLE metrics_test (x int)".to_string(),
identity: None,
});
let response = service.execute(request).await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_execute_records_latency() {
use crate::metrics::ServerMetrics;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let service = QueryServiceImpl::with_full_config(
router,
None,
32,
health_state,
None,
None,
Some(metrics),
);
let request = Request::new(QueryRequest {
query: "CREATE TABLE latency_test (x int)".to_string(),
identity: None,
});
let response = service.execute(request).await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_auth_failure_recorded() {
use crate::config::ApiKey;
use crate::metrics::ServerMetrics;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"test-api-key-12345678".to_string(),
"user:alice".to_string(),
))
.with_anonymous(false);
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let service = QueryServiceImpl::with_full_config(
router,
Some(auth_config),
32,
health_state,
None,
None,
Some(metrics),
);
let request = Request::new(QueryRequest {
query: "CREATE TABLE auth_fail_test (x int)".to_string(),
identity: None,
});
let response = service.execute(request).await;
assert!(response.is_err());
assert_eq!(response.unwrap_err().code(), tonic::Code::Unauthenticated);
}
#[tokio::test]
async fn test_rate_limit_recorded() {
use crate::config::ApiKey;
use crate::metrics::ServerMetrics;
use crate::rate_limit::{RateLimitConfig, RateLimiter};
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use tonic::metadata::MetadataValue;
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let auth_config = AuthConfig::new().with_api_key(ApiKey::new(
"test-api-key-12345678".to_string(),
"user:rate_test".to_string(),
));
let rate_limiter = Arc::new(RateLimiter::new(RateLimitConfig::new().with_max_queries(1)));
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let service = QueryServiceImpl::with_full_config(
router,
Some(auth_config),
32,
health_state,
Some(rate_limiter),
None,
Some(metrics),
);
let mut request = Request::new(QueryRequest {
query: "CREATE TABLE rate_test (x int)".to_string(),
identity: None,
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("test-api-key-12345678"),
);
let response = service.execute(request).await;
assert!(response.is_ok());
let mut request = Request::new(QueryRequest {
query: "SELECT * FROM rate_test".to_string(),
identity: None,
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("test-api-key-12345678"),
);
let response = service.execute(request).await;
assert!(response.is_err());
assert_eq!(response.unwrap_err().code(), tonic::Code::ResourceExhausted);
}
use tokio_stream::StreamExt;
#[test]
fn test_with_full_config_all_parameters() {
use crate::audit::{AuditConfig, AuditLogger};
use crate::metrics::ServerMetrics;
use crate::rate_limit::{RateLimitConfig, RateLimiter};
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
let router = create_test_router();
let auth_config = AuthConfig::new().with_anonymous(false);
let health_state = Arc::new(HealthState::new());
let rate_limiter = Arc::new(RateLimiter::new(RateLimitConfig::default()));
let audit_logger = Arc::new(AuditLogger::new(AuditConfig::default()));
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let service = QueryServiceImpl::with_full_config(
router,
Some(auth_config),
64,
Arc::clone(&health_state),
Some(Arc::clone(&rate_limiter)),
Some(Arc::clone(&audit_logger)),
Some(Arc::clone(&metrics)),
);
assert!(service.auth_config.is_some());
assert_eq!(service.stream_channel_capacity, 64);
assert!(service.health_state.is_some());
assert!(service.rate_limiter.is_some());
assert!(service.audit_logger.is_some());
assert!(service.metrics.is_some());
}
#[test]
fn test_with_full_config_minimal_parameters() {
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let service =
QueryServiceImpl::with_full_config(router, None, 32, health_state, None, None, None);
assert!(service.auth_config.is_none());
assert_eq!(service.stream_channel_capacity, 32);
assert!(service.health_state.is_some());
assert!(service.rate_limiter.is_none());
assert!(service.audit_logger.is_none());
assert!(service.metrics.is_none());
}
#[tokio::test]
async fn test_audit_logger_integration() {
use crate::audit::{AuditConfig, AuditLogger};
use crate::config::ApiKey;
use tonic::metadata::MetadataValue;
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let audit_logger = Arc::new(AuditLogger::new(AuditConfig::default()));
let auth_config = AuthConfig::new().with_api_key(ApiKey::new(
"test-audit-key-123456".to_string(),
"user:audit_test".to_string(),
));
let service = QueryServiceImpl::with_full_config(
router,
Some(auth_config),
32,
health_state,
None,
Some(Arc::clone(&audit_logger)),
None,
);
let mut request = Request::new(QueryRequest {
query: "CREATE TABLE audit_test (x int)".to_string(),
identity: None,
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("test-audit-key-123456"),
);
let response = service.execute(request).await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_invalid_api_key() {
use crate::config::ApiKey;
use tonic::metadata::MetadataValue;
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"valid-key-123456789".to_string(),
"user:valid".to_string(),
))
.with_anonymous(false);
let service = QueryServiceImpl::with_full_config(
router,
Some(auth_config),
32,
health_state,
None,
None,
None,
);
let mut request = Request::new(QueryRequest {
query: "CREATE TABLE invalid_auth (x int)".to_string(),
identity: None,
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("wrong-key-000000000"),
);
let response = service.execute(request).await;
assert!(response.is_err());
assert_eq!(response.unwrap_err().code(), tonic::Code::Unauthenticated);
}
#[tokio::test]
async fn test_missing_api_key_when_required() {
use crate::config::ApiKey;
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"required-key-123456".to_string(),
"user:required".to_string(),
))
.with_anonymous(false);
let service = QueryServiceImpl::with_full_config(
router,
Some(auth_config),
32,
health_state,
None,
None,
None,
);
let request = Request::new(QueryRequest {
query: "CREATE TABLE missing_auth (x int)".to_string(),
identity: None,
});
let response = service.execute(request).await;
assert!(response.is_err());
assert_eq!(response.unwrap_err().code(), tonic::Code::Unauthenticated);
}
#[tokio::test]
async fn test_rate_limiter_enforcement() {
use crate::config::ApiKey;
use crate::rate_limit::{RateLimitConfig, RateLimiter};
use std::time::Duration;
use tonic::metadata::MetadataValue;
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let auth_config = AuthConfig::new().with_api_key(ApiKey::new(
"rate-limit-key-123456".to_string(),
"user:ratelimit".to_string(),
));
let rate_limiter = Arc::new(RateLimiter::new(
RateLimitConfig::new()
.with_max_queries(2)
.with_window(Duration::from_secs(60)),
));
let service = QueryServiceImpl::with_full_config(
router,
Some(auth_config),
32,
health_state,
Some(rate_limiter),
None,
None,
);
for i in 0..2 {
let mut request = Request::new(QueryRequest {
query: format!("CREATE TABLE rate{i} (x int)"),
identity: None,
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("rate-limit-key-123456"),
);
let response = service.execute(request).await;
assert!(response.is_ok(), "Request {i} should succeed");
}
let mut request = Request::new(QueryRequest {
query: "CREATE TABLE rate_exceeded (x int)".to_string(),
identity: None,
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("rate-limit-key-123456"),
);
let response = service.execute(request).await;
assert!(response.is_err());
assert_eq!(response.unwrap_err().code(), tonic::Code::ResourceExhausted);
}
#[tokio::test]
async fn test_all_components_together() {
use crate::audit::{AuditConfig, AuditLogger};
use crate::config::ApiKey;
use crate::metrics::ServerMetrics;
use crate::rate_limit::{RateLimitConfig, RateLimiter};
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use tonic::metadata::MetadataValue;
let router = create_test_router();
let health_state = Arc::new(HealthState::new());
let auth_config = AuthConfig::new().with_api_key(ApiKey::new(
"integrated-key-123456".to_string(),
"user:integrated".to_string(),
));
let rate_limiter = Arc::new(RateLimiter::new(RateLimitConfig::default()));
let audit_logger = Arc::new(AuditLogger::new(AuditConfig::default()));
let provider = SdkMeterProvider::builder().build();
let meter = provider.meter("test");
let metrics = Arc::new(ServerMetrics::new(meter));
let service = QueryServiceImpl::with_full_config(
router,
Some(auth_config),
32,
health_state,
Some(rate_limiter),
Some(audit_logger),
Some(metrics),
);
let mut request = Request::new(QueryRequest {
query: "CREATE TABLE integrated_test (x int)".to_string(),
identity: None,
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("integrated-key-123456"),
);
let response = service.execute(request).await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_paginated_query() {
let router = create_test_router();
{
let r = router.write();
r.execute("CREATE TABLE paginated (name text, val int)")
.unwrap();
for i in 0..20 {
r.execute(&format!(
"INSERT INTO paginated (name, val) VALUES ('item{i}', {i})"
))
.unwrap();
}
}
let service = QueryServiceImpl::new(router);
let request = Request::new(PaginatedQueryRequest {
query: "SELECT * FROM paginated".to_string(),
page_size: Some(5),
cursor: None,
identity: None,
count_total: None,
cursor_ttl_secs: None,
});
let response = service.execute_paginated(request).await;
assert!(response.is_ok());
let inner = response.unwrap().into_inner();
assert!(inner.result.is_some());
}
#[tokio::test]
async fn test_paginated_query_no_more_results() {
let router = create_test_router();
{
let r = router.write();
r.execute("CREATE TABLE small_page (x int)").unwrap();
r.execute("INSERT INTO small_page (x) VALUES (1)").unwrap();
}
let service = QueryServiceImpl::new(router);
let request = Request::new(PaginatedQueryRequest {
query: "SELECT * FROM small_page".to_string(),
page_size: Some(100),
cursor: None,
identity: None,
count_total: None,
cursor_ttl_secs: None,
});
let response = service.execute_paginated(request).await;
assert!(response.is_ok());
let inner = response.unwrap().into_inner();
assert!(inner.next_cursor.is_none() || !inner.has_more);
}
#[tokio::test]
async fn test_close_cursor() {
let router = create_test_router();
{
let r = router.write();
r.execute("CREATE TABLE cursor_test (x int)").unwrap();
for i in 0..10 {
r.execute(&format!("INSERT INTO cursor_test (x) VALUES ({i})"))
.unwrap();
}
}
let service = QueryServiceImpl::new(router);
let request = Request::new(PaginatedQueryRequest {
query: "SELECT * FROM cursor_test".to_string(),
page_size: Some(3),
cursor: None,
identity: None,
count_total: None,
cursor_ttl_secs: None,
});
let response = service.execute_paginated(request).await;
assert!(response.is_ok());
let inner = response.unwrap().into_inner();
let cursor = inner.next_cursor;
if let Some(cursor_str) = cursor {
let close_request = Request::new(CloseCursorRequest { cursor: cursor_str });
let close_response = service.close_cursor(close_request).await;
assert!(close_response.is_ok());
}
}
#[tokio::test]
async fn test_close_invalid_cursor() {
let router = create_test_router();
let service = QueryServiceImpl::new(router);
let request = Request::new(CloseCursorRequest {
cursor: "invalid_cursor_id_12345".to_string(),
});
let response = service.close_cursor(request).await;
assert!(response.is_ok());
let inner = response.unwrap().into_inner();
assert!(!inner.success);
}
#[tokio::test]
async fn test_execute_stream_with_auth() {
use crate::config::ApiKey;
use tonic::metadata::MetadataValue;
let router = create_test_router();
{
let r = router.write();
r.execute("CREATE TABLE stream_auth (x int)").unwrap();
r.execute("INSERT INTO stream_auth (x) VALUES (1)").unwrap();
}
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"stream-auth-key-12345".to_string(),
"user:stream".to_string(),
))
.with_anonymous(false);
let service = QueryServiceImpl::with_auth(router, auth_config);
let request = Request::new(QueryRequest {
query: "SELECT * FROM stream_auth".to_string(),
identity: None,
});
let response = service.execute_stream(request).await;
assert!(response.is_err());
let mut request = Request::new(QueryRequest {
query: "SELECT * FROM stream_auth".to_string(),
identity: None,
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("stream-auth-key-12345"),
);
let response = service.execute_stream(request).await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_paginated_query_with_auth() {
use crate::config::ApiKey;
use tonic::metadata::MetadataValue;
let router = create_test_router();
{
let r = router.write();
r.execute("CREATE TABLE page_auth (x int)").unwrap();
r.execute("INSERT INTO page_auth (x) VALUES (1)").unwrap();
}
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"page-auth-key-123456".to_string(),
"user:page".to_string(),
))
.with_anonymous(false);
let service = QueryServiceImpl::with_auth(router, auth_config);
let request = Request::new(PaginatedQueryRequest {
query: "SELECT * FROM page_auth".to_string(),
page_size: Some(10),
cursor: None,
identity: None,
count_total: None,
cursor_ttl_secs: None,
});
let response = service.execute_paginated(request).await;
assert!(response.is_err());
let mut request = Request::new(PaginatedQueryRequest {
query: "SELECT * FROM page_auth".to_string(),
page_size: Some(10),
cursor: None,
identity: None,
count_total: None,
cursor_ttl_secs: None,
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("page-auth-key-123456"),
);
let response = service.execute_paginated(request).await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_close_cursor_with_auth() {
use crate::config::ApiKey;
use tonic::metadata::MetadataValue;
let router = create_test_router();
let auth_config = AuthConfig::new()
.with_api_key(ApiKey::new(
"close-auth-key-123456".to_string(),
"user:close".to_string(),
))
.with_anonymous(false);
let service = QueryServiceImpl::with_auth(router, auth_config);
let request = Request::new(CloseCursorRequest {
cursor: "any_cursor".to_string(),
});
let response = service.close_cursor(request).await;
assert!(response.is_err());
let mut request = Request::new(CloseCursorRequest {
cursor: "any_cursor".to_string(),
});
request.metadata_mut().insert(
"x-api-key",
MetadataValue::from_static("close-auth-key-123456"),
);
let response = service.close_cursor(request).await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_execute_stream_multiple_rows() {
let router = create_test_router();
{
let r = router.write();
r.execute("CREATE TABLE stream_rows (name text, value int)")
.unwrap();
for i in 0..5 {
r.execute(&format!(
"INSERT INTO stream_rows (name, value) VALUES ('row{i}', {i})"
))
.unwrap();
}
}
let service = QueryServiceImpl::new(router);
let request = Request::new(QueryRequest {
query: "SELECT * FROM stream_rows".to_string(),
identity: None,
});
let response = service.execute_stream(request).await.unwrap();
let mut stream = response.into_inner();
let mut chunks = vec![];
while let Some(chunk) = stream.next().await {
chunks.push(chunk.unwrap());
}
assert!(!chunks.is_empty());
}
#[tokio::test]
async fn test_execute_stream_with_filter() {
let router = create_test_router();
{
let r = router.write();
r.execute("CREATE TABLE filtered_stream (x int)").unwrap();
for i in 0..10 {
r.execute(&format!("INSERT INTO filtered_stream (x) VALUES ({i})"))
.unwrap();
}
}
let service = QueryServiceImpl::new(router);
let request = Request::new(QueryRequest {
query: "SELECT * FROM filtered_stream WHERE x > 5".to_string(),
identity: None,
});
let response = service.execute_stream(request).await.unwrap();
let mut stream = response.into_inner();
let mut chunks = vec![];
while let Some(chunk) = stream.next().await {
chunks.push(chunk.unwrap());
}
assert!(!chunks.is_empty());
}
#[tokio::test]
async fn test_execute_with_identity_error_path() {
let router = create_test_router();
let service = QueryServiceImpl::new(router);
let request = Request::new(QueryRequest {
query: "INVALID_COMMAND_THAT_DOES_NOT_EXIST".to_string(),
identity: Some("test-user".to_string()),
});
let response = service.execute(request).await;
assert!(response.is_err());
}
#[tokio::test]
async fn test_execute_stream_nodes() {
let router = create_test_router();
{
let r = router.write();
r.execute("CREATE TABLE stream_nodes (name text)").unwrap();
r.execute("INSERT INTO stream_nodes (name) VALUES ('Alice')")
.unwrap();
r.execute("INSERT INTO stream_nodes (name) VALUES ('Bob')")
.unwrap();
}
let service = QueryServiceImpl::with_config(router, None, 10);
let request = Request::new(QueryRequest {
query: "SELECT * FROM stream_nodes".to_string(),
identity: None,
});
let response = service.execute_stream(request).await.unwrap();
let mut stream = response.into_inner();
let mut chunks = vec![];
while let Some(chunk) = stream.next().await {
chunks.push(chunk.unwrap());
}
assert!(!chunks.is_empty());
}
#[tokio::test]
async fn test_execute_stream_edges() {
let router = create_test_router();
{
let r = router.write();
r.execute("NODE CREATE person { name: 'Alice' }").unwrap();
r.execute("NODE CREATE person { name: 'Bob' }").unwrap();
r.execute("EDGE CREATE 1 -> 2 : knows").unwrap();
}
let service = QueryServiceImpl::with_config(router, None, 10);
let request = Request::new(QueryRequest {
query: "EDGE GET 1".to_string(),
identity: None,
});
let response = service.execute_stream(request).await.unwrap();
let mut stream = response.into_inner();
let mut chunks = vec![];
while let Some(chunk) = stream.next().await {
chunks.push(chunk.unwrap());
}
assert!(!chunks.is_empty());
}
#[test]
fn test_default_stream_channel_capacity() {
assert_eq!(DEFAULT_STREAM_CHANNEL_CAPACITY, 32);
}
#[test]
fn test_failure_threshold() {
assert_eq!(FAILURE_THRESHOLD, 5);
}
}