use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use serde_json::Value;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use smooth_operator::access_control::AccessContext;
use smooth_operator::adapter::{
ConversationUpdate, MessagePage, MessageQuery, SessionUpdate, StorageAdapter,
};
use smooth_operator::domain::{Conversation, Message, Participant, Session};
use smooth_operator_adapter_memory::InMemoryStorageAdapter;
use smooth_operator_core::llm::StreamEvent;
use smooth_operator_core::llm_provider::MockLlmClient;
use smooth_operator_core::{CheckpointStore, KnowledgeBase, LlmConfig};
use smooth_operator_server::runner::{self, TurnRequest};
const TURN_ORG: &str = "org-tenant-acme";
fn mock_llm() -> LlmConfig {
LlmConfig::openrouter("not-a-real-key").with_model("openai/gpt-4o")
}
struct OrgRecordingAdapter {
inner: Arc<InMemoryStorageAdapter>,
seen_org: Arc<Mutex<Option<Option<String>>>>,
}
impl OrgRecordingAdapter {
fn new() -> (Self, Arc<Mutex<Option<Option<String>>>>) {
let seen_org = Arc::new(Mutex::new(None));
(
Self {
inner: Arc::new(InMemoryStorageAdapter::new()),
seen_org: Arc::clone(&seen_org),
},
seen_org,
)
}
}
#[async_trait]
impl StorageAdapter for OrgRecordingAdapter {
async fn create_conversation(
&self,
conversation: Conversation,
) -> anyhow::Result<Conversation> {
self.inner.create_conversation(conversation).await
}
async fn get_conversation(&self, id: &str) -> anyhow::Result<Option<Conversation>> {
self.inner.get_conversation(id).await
}
async fn list_conversations_by_org(
&self,
organization_id: &str,
) -> anyhow::Result<Vec<Conversation>> {
self.inner.list_conversations_by_org(organization_id).await
}
async fn update_conversation(
&self,
id: &str,
update: ConversationUpdate,
) -> anyhow::Result<Conversation> {
self.inner.update_conversation(id, update).await
}
async fn add_participant(&self, participant: Participant) -> anyhow::Result<Participant> {
self.inner.add_participant(participant).await
}
async fn get_participant(&self, id: &str) -> anyhow::Result<Option<Participant>> {
self.inner.get_participant(id).await
}
async fn list_participants_by_conversation(
&self,
conversation_id: &str,
) -> anyhow::Result<Vec<Participant>> {
self.inner
.list_participants_by_conversation(conversation_id)
.await
}
async fn resolve_participant_by_external_id(
&self,
conversation_id: &str,
external_id: &str,
) -> anyhow::Result<Option<Participant>> {
self.inner
.resolve_participant_by_external_id(conversation_id, external_id)
.await
}
async fn append_message(&self, message: Message) -> anyhow::Result<Message> {
self.inner.append_message(message).await
}
async fn get_message(&self, id: &str) -> anyhow::Result<Option<Message>> {
self.inner.get_message(id).await
}
async fn list_messages_by_conversation(
&self,
query: MessageQuery,
) -> anyhow::Result<MessagePage> {
self.inner.list_messages_by_conversation(query).await
}
async fn create_session(&self, session: Session) -> anyhow::Result<Session> {
self.inner.create_session(session).await
}
async fn get_session(&self, session_id: &str) -> anyhow::Result<Option<Session>> {
self.inner.get_session(session_id).await
}
async fn update_session(
&self,
session_id: &str,
update: SessionUpdate,
) -> anyhow::Result<Session> {
self.inner.update_session(session_id, update).await
}
async fn list_sessions_by_conversation(
&self,
conversation_id: &str,
) -> anyhow::Result<Vec<Session>> {
self.inner
.list_sessions_by_conversation(conversation_id)
.await
}
fn checkpoints(&self) -> Arc<dyn CheckpointStore> {
self.inner.checkpoints()
}
fn knowledge(&self) -> Arc<dyn KnowledgeBase> {
self.inner.knowledge()
}
fn knowledge_for_access(&self, access: &AccessContext) -> Arc<dyn KnowledgeBase> {
*self.seen_org.lock().unwrap() = Some(access.organization_id.clone());
self.inner.knowledge_for_access(access)
}
}
async fn drain(mut rx: UnboundedReceiver<Value>) -> Vec<Value> {
let mut out = Vec::new();
while let Ok(ev) = rx.try_recv() {
out.push(ev);
}
while let Some(ev) = rx.recv().await {
out.push(ev);
}
out
}
async fn run_turn_as(storage: Arc<dyn StorageAdapter>, access: AccessContext) {
let mock = MockLlmClient::new();
mock.push_stream(vec![
StreamEvent::ToolCallStart {
index: 0,
id: "call_1".into(),
name: "knowledge_search".into(),
},
StreamEvent::ToolCallArgumentsDelta {
index: 0,
arguments_chunk: r#"{"query":"alpha"}"#.into(),
},
StreamEvent::Done {
finish_reason: "tool_calls".into(),
},
])
.push_stream(vec![
StreamEvent::Delta {
content: "Here is what I found.".into(),
},
StreamEvent::Done {
finish_reason: "stop".into(),
},
]);
let (tx, rx): (_, UnboundedReceiver<Value>) = unbounded_channel();
runner::run_streaming_turn(
TurnRequest {
storage,
llm: mock_llm(),
max_iterations: 4,
conversation_id: "conv-org-scope",
request_id: "req-1",
user_message: "Tell me about alpha",
access,
llm_provider: Some(Arc::new(mock.clone())),
reranker: None,
confirmation: None,
interactions: None,
tool_provider: None,
system_prompt: None,
org_id: Some(TURN_ORG.to_string()),
gateway_key: None,
workflow: None,
judge: None,
greeting_section: None,
enabled_tools: None,
auth_gate: None,
tool_configs: None,
extensions: None,
},
&tx,
)
.await
.expect("run_streaming_turn");
drop(tx);
let _ = drain(rx).await;
}
#[tokio::test]
async fn turn_org_reaches_knowledge_for_access() {
let (adapter, seen_org) = OrgRecordingAdapter::new();
let storage: Arc<dyn StorageAdapter> = Arc::new(adapter);
let access = AccessContext::new(Some("user-1".into()), vec!["eng".into()])
.with_organization_id(TURN_ORG);
run_turn_as(storage, access).await;
let observed = seen_org
.lock()
.unwrap()
.clone()
.expect("knowledge_for_access must have been called during the turn");
assert_eq!(
observed,
Some(TURN_ORG.to_string()),
"the turn's org must reach the storage adapter's knowledge_for_access seam"
);
}
#[tokio::test]
async fn no_org_context_is_none_at_knowledge_seam() {
let (adapter, seen_org) = OrgRecordingAdapter::new();
let storage: Arc<dyn StorageAdapter> = Arc::new(adapter);
run_turn_as(storage, AccessContext::anonymous()).await;
let observed = seen_org
.lock()
.unwrap()
.clone()
.expect("knowledge_for_access must have been called during the turn");
assert_eq!(
observed, None,
"an org-less context must reach the knowledge seam as None (single-tenant default)"
);
}