use crate::models::{
CosmosOperation, CosmosResponseHeaders, OperationType, ResourceType, SessionToken,
};
use super::session_container::SessionContainer;
fn is_reading_from_master(resource_type: ResourceType, operation_type: OperationType) -> bool {
match resource_type {
ResourceType::DatabaseAccount | ResourceType::Database | ResourceType::Offer => true,
ResourceType::PartitionKeyRange => true,
ResourceType::DocumentCollection => matches!(
operation_type,
OperationType::ReadFeed | OperationType::Query | OperationType::SqlQuery
),
_ => false,
}
}
#[derive(Debug)]
pub(crate) struct SessionManager {
container: SessionContainer,
}
impl SessionManager {
pub(crate) fn new() -> Self {
Self {
container: SessionContainer::new(),
}
}
pub(crate) fn resolve_session_token(
&self,
operation: &CosmosOperation,
user_token: Option<&SessionToken>,
) -> Option<SessionToken> {
if let Some(token) = user_token {
return Some(token.clone());
}
let container = operation.container()?;
self.container.resolve_session_token(container)
}
pub(crate) fn capture_session_token(
&self,
operation: &CosmosOperation,
headers: &CosmosResponseHeaders,
) {
if is_reading_from_master(operation.resource_type(), operation.operation_type()) {
return;
}
let session_token = match &headers.session_token {
Some(t) => t.as_str(),
None => return,
};
let container = match operation.container() {
Some(c) => c,
None => return,
};
self.container.set_session_token(container, session_token);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{
AccountReference, ContainerProperties, ContainerReference, CosmosOperation,
CosmosResponseHeaders, DatabaseReference, ItemReference, OperationType, PartitionKey,
PartitionKeyDefinition, ResourceType, SessionToken, SystemProperties,
};
use url::Url;
fn test_container() -> ContainerReference {
let account = AccountReference::with_master_key(
Url::parse("https://test.documents.azure.com:443/").unwrap(),
"dGVzdA==",
);
let pk_def: PartitionKeyDefinition = serde_json::from_str(r#"{"paths":["/pk"]}"#).unwrap();
let props = ContainerProperties {
id: "coll1".into(),
partition_key: pk_def,
system_properties: SystemProperties::default(),
};
ContainerReference::new(account, "db1", "db_rid1", "coll1", "coll_rid1", &props)
}
fn make_response_headers(
session_token: Option<&str>,
owner_id: Option<&str>,
owner_full_name: Option<&str>,
) -> CosmosResponseHeaders {
CosmosResponseHeaders {
session_token: session_token.map(|s| SessionToken::new(s.to_owned())),
owner_id: owner_id.map(|s| s.to_owned()),
owner_full_name: owner_full_name.map(|s| s.to_owned()),
..Default::default()
}
}
#[test]
fn resolve_returns_none_when_empty() {
let mgr = SessionManager::new();
let container = test_container();
let op = CosmosOperation::read_item(ItemReference::from_name(
&container,
PartitionKey::from("pk1"),
"doc1",
));
assert!(mgr.resolve_session_token(&op, None).is_none());
}
#[test]
fn user_token_takes_precedence() {
let mgr = SessionManager::new();
let container = test_container();
let op = CosmosOperation::read_item(ItemReference::from_name(
&container,
PartitionKey::from("pk1"),
"doc1",
));
let user_token = SessionToken::new("user-provided");
let result = mgr.resolve_session_token(&op, Some(&user_token));
assert_eq!(result.unwrap().as_str(), "user-provided");
}
#[test]
fn capture_and_resolve() {
let mgr = SessionManager::new();
let container = test_container();
let op = CosmosOperation::read_item(ItemReference::from_name(
&container,
PartitionKey::from("pk1"),
"doc1",
));
let headers = make_response_headers(
Some("0:1#100#1=10"),
Some("coll_rid1"),
Some("dbs/db1/colls/coll1"),
);
mgr.capture_session_token(&op, &headers);
let token = mgr.resolve_session_token(&op, None).unwrap();
assert_eq!(token.as_str(), "0:1#100#1=10");
}
#[test]
fn capture_skips_missing_session_token() {
let mgr = SessionManager::new();
let container = test_container();
let op = CosmosOperation::read_item(ItemReference::from_name(
&container,
PartitionKey::from("pk1"),
"doc1",
));
let headers = make_response_headers(None, Some("coll_rid1"), None);
mgr.capture_session_token(&op, &headers);
assert!(mgr.resolve_session_token(&op, None).is_none());
}
#[test]
fn merge_on_capture() {
let mgr = SessionManager::new();
let container = test_container();
let op = CosmosOperation::read_item(ItemReference::from_name(
&container,
PartitionKey::from("pk1"),
"doc1",
));
let h1 = make_response_headers(
Some("0:1#100#1=10"),
Some("coll_rid1"),
Some("dbs/db1/colls/coll1"),
);
mgr.capture_session_token(&op, &h1);
let h2 = make_response_headers(
Some("0:1#200#1=20"),
Some("coll_rid1"),
Some("dbs/db1/colls/coll1"),
);
mgr.capture_session_token(&op, &h2);
let token = mgr.resolve_session_token(&op, None).unwrap();
assert!(token.as_str().contains("200"));
}
#[test]
fn resolve_via_name_fallback() {
let mgr = SessionManager::new();
let account = AccountReference::with_master_key(
Url::parse("https://test.documents.azure.com:443/").unwrap(),
"dGVzdA==",
);
let pk_def: PartitionKeyDefinition = serde_json::from_str(r#"{"paths":["/pk"]}"#).unwrap();
let props = ContainerProperties {
id: "coll1".into(),
partition_key: pk_def.clone(),
system_properties: SystemProperties::default(),
};
let c_capture = ContainerReference::new(
account.clone(),
"db1",
"db_rid1",
"coll1",
"original_rid",
&props,
);
let op_capture = CosmosOperation::read_item(ItemReference::from_name(
&c_capture,
PartitionKey::from("pk1"),
"doc1",
));
let headers = make_response_headers(
Some("0:1#100"),
Some("original_rid"),
Some("dbs/db1/colls/coll1"),
);
mgr.capture_session_token(&op_capture, &headers);
let props2 = ContainerProperties {
id: "coll1".into(),
partition_key: pk_def,
system_properties: SystemProperties::default(),
};
let c_resolve =
ContainerReference::new(account, "db1", "db_rid1", "coll1", "different_rid", &props2);
let op_resolve = CosmosOperation::read_item(ItemReference::from_name(
&c_resolve,
PartitionKey::from("pk1"),
"doc1",
));
let token = mgr.resolve_session_token(&op_resolve, None).unwrap();
assert_eq!(token.as_str(), "0:1#100");
}
#[test]
fn capture_uses_container_reference_rid() {
let mgr = SessionManager::new();
let container = test_container();
let op = CosmosOperation::read_item(ItemReference::from_name(
&container,
PartitionKey::from("pk1"),
"doc1",
));
let headers = make_response_headers(
Some("0:1#100"),
Some("some_other_rid"),
Some("dbs/db1/colls/coll1"),
);
mgr.capture_session_token(&op, &headers);
let token = mgr.resolve_session_token(&op, None).unwrap();
assert_eq!(token.as_str(), "0:1#100");
}
#[test]
fn capture_succeeds_without_owner_id() {
let mgr = SessionManager::new();
let container = test_container();
let op = CosmosOperation::read_item(ItemReference::from_name(
&container,
PartitionKey::from("pk1"),
"doc1",
));
let headers = make_response_headers(Some("0:1#100"), None, None);
mgr.capture_session_token(&op, &headers);
assert!(mgr.resolve_session_token(&op, None).is_some());
}
#[test]
fn multiple_containers_isolated() {
let mgr = SessionManager::new();
let account = AccountReference::with_master_key(
Url::parse("https://test.documents.azure.com:443/").unwrap(),
"dGVzdA==",
);
let pk_def: PartitionKeyDefinition = serde_json::from_str(r#"{"paths":["/pk"]}"#).unwrap();
let props1 = ContainerProperties {
id: "coll1".into(),
partition_key: pk_def.clone(),
system_properties: SystemProperties::default(),
};
let c1 = ContainerReference::new(
account.clone(),
"db1",
"db_rid1",
"coll1",
"coll_rid1",
&props1,
);
let props2 = ContainerProperties {
id: "coll2".into(),
partition_key: pk_def,
system_properties: SystemProperties::default(),
};
let c2 = ContainerReference::new(account, "db1", "db_rid1", "coll2", "coll_rid2", &props2);
let op1 = CosmosOperation::read_item(ItemReference::from_name(
&c1,
PartitionKey::from("pk1"),
"doc1",
));
let op2 = CosmosOperation::read_item(ItemReference::from_name(
&c2,
PartitionKey::from("pk1"),
"doc1",
));
let h1 = make_response_headers(
Some("0:1#100"),
Some("coll_rid1"),
Some("dbs/db1/colls/coll1"),
);
mgr.capture_session_token(&op1, &h1);
let h2 = make_response_headers(
Some("0:1#999"),
Some("coll_rid2"),
Some("dbs/db1/colls/coll2"),
);
mgr.capture_session_token(&op2, &h2);
let t1 = mgr.resolve_session_token(&op1, None).unwrap();
let t2 = mgr.resolve_session_token(&op2, None).unwrap();
assert!(t1.as_str().contains("100"));
assert!(t2.as_str().contains("999"));
}
#[test]
fn capture_compound_token() {
let mgr = SessionManager::new();
let container = test_container();
let op = CosmosOperation::read_item(ItemReference::from_name(
&container,
PartitionKey::from("pk1"),
"doc1",
));
let headers = make_response_headers(
Some("0:1#100#1=10,1:1#200#1=20"),
Some("coll_rid1"),
Some("dbs/db1/colls/coll1"),
);
mgr.capture_session_token(&op, &headers);
let token = mgr.resolve_session_token(&op, None).unwrap();
assert!(token.as_str().contains("0:") && token.as_str().contains("1:"));
}
#[test]
fn master_resources_always_reading_from_master() {
for rt in [
ResourceType::DatabaseAccount,
ResourceType::Database,
ResourceType::Offer,
] {
for ot in [
OperationType::Read,
OperationType::Create,
OperationType::Delete,
OperationType::ReadFeed,
OperationType::Query,
] {
assert!(
is_reading_from_master(rt, ot),
"{rt:?}/{ot:?} should be master"
);
}
}
}
#[test]
fn partition_key_range_always_reading_from_master() {
assert!(is_reading_from_master(
ResourceType::PartitionKeyRange,
OperationType::ReadFeed,
));
assert!(is_reading_from_master(
ResourceType::PartitionKeyRange,
OperationType::Read,
));
}
#[test]
fn document_collection_read_feed_query_is_master() {
for ot in [
OperationType::ReadFeed,
OperationType::Query,
OperationType::SqlQuery,
] {
assert!(
is_reading_from_master(ResourceType::DocumentCollection, ot),
"DocumentCollection/{ot:?} should be master"
);
}
}
#[test]
fn document_collection_crud_is_not_master() {
for ot in [
OperationType::Create,
OperationType::Read,
OperationType::Replace,
OperationType::Delete,
] {
assert!(
!is_reading_from_master(ResourceType::DocumentCollection, ot),
"DocumentCollection/{ot:?} should NOT be master"
);
}
}
#[test]
fn data_plane_resources_never_master() {
for rt in [
ResourceType::Document,
ResourceType::StoredProcedure,
ResourceType::Trigger,
ResourceType::UserDefinedFunction,
] {
assert!(
!is_reading_from_master(rt, OperationType::Read),
"{rt:?} should not be master"
);
}
}
#[test]
fn capture_skips_container_create_without_reference() {
let mgr = SessionManager::new();
let account = AccountReference::with_master_key(
Url::parse("https://test.documents.azure.com:443/").unwrap(),
"dGVzdA==",
);
let db = DatabaseReference::from_name(account.clone(), "db1");
let op = CosmosOperation::create_container(db);
assert!(!is_reading_from_master(
op.resource_type(),
op.operation_type()
));
let headers = make_response_headers(
Some("0:1#100"),
Some("coll_rid_new"),
Some("dbs/db1/colls/new_coll"),
);
mgr.capture_session_token(&op, &headers);
let pk_def: PartitionKeyDefinition = serde_json::from_str(r#"{"paths":["/pk"]}"#).unwrap();
let props = ContainerProperties {
id: "new_coll".into(),
partition_key: pk_def,
system_properties: SystemProperties::default(),
};
let probe = ContainerReference::new(
account,
"db1",
"db_rid1",
"new_coll",
"coll_rid_new",
&props,
);
let probe_op = CosmosOperation::read_item(ItemReference::from_name(
&probe,
PartitionKey::from("pk1"),
"doc1",
));
assert!(mgr.resolve_session_token(&probe_op, None).is_none());
}
#[test]
fn capture_skipped_for_container_read_feed() {
let mgr = SessionManager::new();
let account = AccountReference::with_master_key(
Url::parse("https://test.documents.azure.com:443/").unwrap(),
"dGVzdA==",
);
let db = DatabaseReference::from_name(account.clone(), "db1");
let op = CosmosOperation::read_all_containers(db);
assert!(is_reading_from_master(
op.resource_type(),
op.operation_type()
));
let headers = make_response_headers(
Some("0:1#100"),
Some("coll_rid"),
Some("dbs/db1/colls/coll1"),
);
mgr.capture_session_token(&op, &headers);
let pk_def: PartitionKeyDefinition = serde_json::from_str(r#"{"paths":["/pk"]}"#).unwrap();
let props = ContainerProperties {
id: "coll1".into(),
partition_key: pk_def,
system_properties: SystemProperties::default(),
};
let probe = ContainerReference::new(account, "db1", "db_rid1", "coll1", "coll_rid", &props);
let probe_op = CosmosOperation::read_item(ItemReference::from_name(
&probe,
PartitionKey::from("pk1"),
"doc1",
));
assert!(mgr.resolve_session_token(&probe_op, None).is_none());
}
}