use std::collections::HashSet;
use crate::leased_resource::LeasedResourceStatus;
use crate::session_resource::{SessionResourceFilter, SessionResourceStatus};
use crate::tools::ToolExecutionResult;
use crate::traits::ToolContext;
pub const LEASED_RESOURCE_PROVIDER_KEY: &str = "leased_resource_provider";
pub const LEASED_RESOURCE_TYPE_KEY: &str = "leased_resource_type";
pub const LEASED_RESOURCE_EXTERNAL_ID_KEY: &str = "leased_resource_external_id";
pub const LEASED_RESOURCE_ID_KEY: &str = "leased_resource_id";
pub async fn list_owned_external_resource_ids(
context: &ToolContext,
provider: &str,
resource_type: &str,
) -> Result<Option<HashSet<String>>, ToolExecutionResult> {
if let Some(store) = context.leased_resource_store.as_ref() {
let resources = store
.list_resources(context.session_id)
.await
.map_err(|e| {
ToolExecutionResult::internal_error_msg(format!(
"Failed to read leased resources: {e}"
))
})?;
let owned = resources
.into_iter()
.filter(|resource| {
resource.provider == provider
&& resource.resource_type == resource_type
&& resource.status == LeasedResourceStatus::Active
})
.map(|resource| resource.external_id)
.collect();
return Ok(Some(owned));
}
if let Some(registry) = context.session_resource_registry.as_ref() {
let filter = SessionResourceFilter {
kind: Some(resource_type.to_string()),
status: Some(SessionResourceStatus::Active),
};
let entries = registry
.list(context.session_id, Some(&filter))
.await
.map_err(|e| {
ToolExecutionResult::internal_error_msg(format!(
"Failed to read session resources: {e}"
))
})?;
let mut owned = HashSet::new();
let mut saw_untracked_entry = false;
for entry in entries {
let Some(metadata) = entry.metadata.as_object() else {
saw_untracked_entry = true;
continue;
};
let Some(entry_provider) = metadata
.get(LEASED_RESOURCE_PROVIDER_KEY)
.and_then(|value| value.as_str())
else {
saw_untracked_entry = true;
continue;
};
let Some(entry_resource_type) = metadata
.get(LEASED_RESOURCE_TYPE_KEY)
.and_then(|value| value.as_str())
else {
saw_untracked_entry = true;
continue;
};
let Some(external_id) = metadata
.get(LEASED_RESOURCE_EXTERNAL_ID_KEY)
.and_then(|value| value.as_str())
else {
saw_untracked_entry = true;
continue;
};
if entry_provider == provider && entry_resource_type == resource_type {
owned.insert(external_id.to_owned());
}
}
if saw_untracked_entry {
return Ok(None);
}
return Ok(Some(owned));
}
Ok(None)
}
pub async fn verify_owned_external_resource_if_available(
context: &ToolContext,
provider: &str,
resource_type: &str,
external_id: &str,
) -> Result<(), ToolExecutionResult> {
if let Some(owned_ids) =
list_owned_external_resource_ids(context, provider, resource_type).await?
&& !owned_ids.contains(external_id)
{
return Err(resource_not_owned_error(external_id));
}
Ok(())
}
pub async fn require_owned_external_resource(
context: &ToolContext,
provider: &str,
resource_type: &str,
external_id: &str,
) -> Result<(), ToolExecutionResult> {
let Some(owned_ids) =
list_owned_external_resource_ids(context, provider, resource_type).await?
else {
return Err(ownership_tracking_unavailable_error(
provider,
resource_type,
));
};
if owned_ids.contains(external_id) {
Ok(())
} else {
Err(resource_not_owned_error(external_id))
}
}
pub fn resource_not_owned_error(external_id: &str) -> ToolExecutionResult {
ToolExecutionResult::tool_error(format!(
"Resource {external_id} was not created by this session"
))
}
pub fn ownership_tracking_unavailable_error(
provider: &str,
resource_type: &str,
) -> ToolExecutionResult {
ToolExecutionResult::tool_error(format!(
"Session resource tracking is unavailable; cannot verify ownership for {provider} {resource_type} resources"
))
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use chrono::{TimeDelta, Utc};
use serde_json::json;
use std::sync::Arc;
use crate::error::Result;
use crate::leased_resource::{LeasedResource, LeasedResourceStatus, UpsertLeasedResource};
use crate::session_resource::{RegisterSessionResource, SessionResourceEntry};
use crate::traits::{LeasedResourceStore, SessionResourceRegistry};
use crate::typed_id::{LeasedResourceId, SessionId};
#[derive(Default)]
struct TestLeasedResourceStore {
resources: tokio::sync::Mutex<Vec<LeasedResource>>,
}
#[async_trait]
impl LeasedResourceStore for TestLeasedResourceStore {
async fn upsert_resource(&self, input: UpsertLeasedResource) -> Result<LeasedResource> {
let now = Utc::now();
let resource = LeasedResource {
id: LeasedResourceId::new(),
session_id: Some(input.session_id),
provider: input.provider,
resource_type: input.resource_type,
external_id: input.external_id,
display_name: input.display_name,
status: LeasedResourceStatus::Active,
owner_user_id: input.owner_user_id,
lease_duration_seconds: input.lease_duration_seconds,
last_touched_at: now,
lease_expires_at: now + TimeDelta::seconds(i64::from(input.lease_duration_seconds)),
cleanup_started_at: None,
cleanup_completed_at: None,
cleanup_attempts: 0,
last_cleanup_error: None,
metadata: input.metadata,
created_at: now,
updated_at: now,
};
self.resources.lock().await.push(resource.clone());
Ok(resource)
}
async fn release_resource(
&self,
_session_id: SessionId,
_provider: &str,
_resource_type: &str,
_external_id: &str,
) -> Result<Option<LeasedResource>> {
Ok(None)
}
async fn list_resources(&self, session_id: SessionId) -> Result<Vec<LeasedResource>> {
Ok(self
.resources
.lock()
.await
.iter()
.filter(|resource| resource.session_id == Some(session_id))
.cloned()
.collect())
}
}
#[derive(Default)]
struct TestSessionResourceRegistry {
entries: tokio::sync::Mutex<Vec<SessionResourceEntry>>,
}
#[async_trait]
impl SessionResourceRegistry for TestSessionResourceRegistry {
async fn register(&self, entry: RegisterSessionResource) -> Result<SessionResourceEntry> {
let now = Utc::now();
let entry = SessionResourceEntry {
resource_id: entry.resource_id,
session_id: entry.session_id,
kind: entry.kind,
display_name: entry.display_name,
status: entry.status,
metadata: entry.metadata,
created_at: now,
updated_at: now,
};
self.entries.lock().await.push(entry.clone());
Ok(entry)
}
async fn update_status(
&self,
_session_id: SessionId,
_resource_id: &str,
_status: SessionResourceStatus,
) -> Result<Option<SessionResourceEntry>> {
Ok(None)
}
async fn get(
&self,
_session_id: SessionId,
_resource_id: &str,
) -> Result<Option<SessionResourceEntry>> {
Ok(None)
}
async fn list(
&self,
session_id: SessionId,
filter: Option<&SessionResourceFilter>,
) -> Result<Vec<SessionResourceEntry>> {
let entries = self.entries.lock().await;
Ok(entries
.iter()
.filter(|entry| entry.session_id == session_id)
.filter(|entry| {
filter
.and_then(|filter| filter.kind.as_deref())
.is_none_or(|kind| entry.kind == kind)
})
.filter(|entry| {
filter
.and_then(|filter| filter.status)
.is_none_or(|status| entry.status == status)
})
.cloned()
.collect())
}
async fn deregister(&self, _session_id: SessionId, _resource_id: &str) -> Result<bool> {
Ok(false)
}
}
#[tokio::test]
async fn list_owned_ids_prefers_leased_resources() {
let session_id = SessionId::new();
let store = Arc::new(TestLeasedResourceStore::default());
store
.upsert_resource(UpsertLeasedResource {
session_id,
provider: "daytona".to_string(),
resource_type: "sandbox".to_string(),
external_id: "sb_test".to_string(),
display_name: None,
owner_user_id: None,
lease_duration_seconds: 60,
metadata: json!({}),
})
.await
.unwrap();
let context = ToolContext::new(session_id).with_leased_resource_store(store);
let owned = list_owned_external_resource_ids(&context, "daytona", "sandbox")
.await
.unwrap()
.unwrap();
assert!(owned.contains("sb_test"));
}
#[tokio::test]
async fn list_owned_ids_falls_back_to_session_resources() {
let session_id = SessionId::new();
let registry = Arc::new(TestSessionResourceRegistry::default());
registry
.register(RegisterSessionResource {
session_id,
resource_id: "resource_1".to_string(),
kind: "sandbox".to_string(),
display_name: "sandbox".to_string(),
status: SessionResourceStatus::Active,
metadata: json!({
LEASED_RESOURCE_PROVIDER_KEY: "daytona",
LEASED_RESOURCE_TYPE_KEY: "sandbox",
LEASED_RESOURCE_EXTERNAL_ID_KEY: "sb_test",
}),
})
.await
.unwrap();
let context = ToolContext::new(session_id).with_session_resource_registry(registry);
let owned = list_owned_external_resource_ids(&context, "daytona", "sandbox")
.await
.unwrap()
.unwrap();
assert!(owned.contains("sb_test"));
}
#[tokio::test]
async fn list_owned_ids_returns_none_for_legacy_session_resources() {
let session_id = SessionId::new();
let registry = Arc::new(TestSessionResourceRegistry::default());
registry
.register(RegisterSessionResource {
session_id,
resource_id: "resource_legacy".to_string(),
kind: "sandbox".to_string(),
display_name: "sandbox".to_string(),
status: SessionResourceStatus::Active,
metadata: json!({}),
})
.await
.unwrap();
let context = ToolContext::new(session_id).with_session_resource_registry(registry);
let owned = list_owned_external_resource_ids(&context, "daytona", "sandbox")
.await
.unwrap();
assert!(owned.is_none());
}
}