1use crate::grpc::RuntimeAgentService;
2use crate::grpc::error::GrpcError;
3type Result<T> = std::result::Result<T, GrpcError>;
4use std::sync::Arc;
5use steer_core::api::Client as ApiClient;
6use steer_core::app::domain::runtime::RuntimeService;
7use steer_core::app::domain::session::{InMemoryEventStore, SessionCatalog};
8use steer_core::catalog::CatalogConfig;
9use steer_core::config::model::ModelId;
10use steer_core::tools::ToolSystemBuilder;
11use steer_proto::agent::v1::agent_service_server::AgentServiceServer;
12use steer_workspace::{LocalEnvironmentManager, LocalWorkspaceManager, RepoManager};
13use tokio::sync::oneshot;
14use tonic::transport::{Channel, Server};
15
16pub async fn create_local_channel(
17 runtime_service: &RuntimeService,
18 catalog: Arc<dyn SessionCatalog>,
19 model_registry: Arc<steer_core::model_registry::ModelRegistry>,
20 provider_registry: Arc<steer_core::auth::ProviderRegistry>,
21 llm_config_provider: steer_core::config::LlmConfigProvider,
22 environment_root: std::path::PathBuf,
23) -> Result<(Channel, tokio::task::JoinHandle<()>)> {
24 let (tx, rx) = oneshot::channel();
25
26 let workspace_manager = Arc::new(
27 LocalWorkspaceManager::new(environment_root.clone())
28 .await
29 .map_err(|e| GrpcError::InvalidSessionState {
30 reason: format!("Failed to create workspace manager: {e}"),
31 })?,
32 );
33 let repo_manager: Arc<dyn RepoManager> = workspace_manager.clone();
34 let environment_manager = Arc::new(LocalEnvironmentManager::new(environment_root));
35
36 let service = RuntimeAgentService::new(crate::grpc::RuntimeAgentDeps {
37 runtime: runtime_service.handle(),
38 catalog,
39 llm_config_provider,
40 model_registry,
41 provider_registry,
42 environment_manager,
43 workspace_manager,
44 repo_manager,
45 });
46 let svc = AgentServiceServer::new(service);
47
48 let server_handle: tokio::task::JoinHandle<()> = tokio::spawn(async move {
49 let addr: std::net::SocketAddr = match "127.0.0.1:0".parse() {
50 Ok(addr) => addr,
51 Err(error) => {
52 tracing::error!(error = %error, "Failed to parse localhost address");
53 return;
54 }
55 };
56 let listener = match tokio::net::TcpListener::bind(addr).await {
57 Ok(listener) => listener,
58 Err(error) => {
59 tracing::error!(error = %error, "Failed to bind localhost listener");
60 return;
61 }
62 };
63 let local_addr = match listener.local_addr() {
64 Ok(local_addr) => local_addr,
65 Err(error) => {
66 tracing::error!(error = %error, "Failed to read localhost address");
67 return;
68 }
69 };
70
71 if tx.send(local_addr).is_err() {
72 tracing::warn!("Failed to send localhost address to channel");
73 return;
74 }
75
76 if let Err(error) = Server::builder()
77 .add_service(svc)
78 .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
79 .await
80 {
81 tracing::error!(error = %error, "Failed to run localhost server");
82 }
83 });
84
85 let addr = rx
86 .await
87 .map_err(|e| GrpcError::ChannelError(format!("Failed to receive server address: {e}")))?;
88
89 let endpoint =
90 tonic::transport::Endpoint::try_from(format!("http://{addr}"))?.tcp_nodelay(true);
91 let channel = endpoint.connect().await?;
92
93 Ok((channel, server_handle))
94}
95
96pub struct LocalGrpcSetup {
97 pub channel: Channel,
98 pub server_handle: tokio::task::JoinHandle<()>,
99 pub runtime_service: RuntimeService,
100}
101
102pub async fn setup_local_grpc_with_catalog(
103 _default_model: ModelId,
104 session_db_path: Option<std::path::PathBuf>,
105 catalog_config: CatalogConfig,
106 workspace_root: Option<std::path::PathBuf>,
107) -> Result<LocalGrpcSetup> {
108 let (event_store, catalog): (
109 Arc<dyn steer_core::app::domain::session::EventStore>,
110 Arc<dyn SessionCatalog>,
111 ) = if let Some(db_path) = session_db_path {
112 let sqlite_store = Arc::new(
113 steer_core::app::domain::session::SqliteEventStore::new(&db_path)
114 .await
115 .map_err(|e| GrpcError::InvalidSessionState {
116 reason: format!("Failed to create event store: {e}"),
117 })?,
118 );
119 (sqlite_store.clone(), sqlite_store)
120 } else {
121 let in_memory_store = Arc::new(InMemoryEventStore::new());
122 (in_memory_store.clone(), in_memory_store)
123 };
124
125 let model_registry = Arc::new(
126 steer_core::model_registry::ModelRegistry::load(&catalog_config.catalog_paths)
127 .map_err(GrpcError::CoreError)?,
128 );
129
130 let provider_registry = Arc::new(
131 steer_core::auth::ProviderRegistry::load(&catalog_config.catalog_paths)
132 .map_err(GrpcError::CoreError)?,
133 );
134
135 #[cfg(not(test))]
136 let auth_storage = std::sync::Arc::new(
137 steer_core::auth::DefaultAuthStorage::new().map_err(|e| GrpcError::CoreError(e.into()))?,
138 );
139
140 #[cfg(test)]
141 let auth_storage = std::sync::Arc::new(steer_core::test_utils::InMemoryAuthStorage::new());
142
143 let llm_config_provider =
144 steer_core::config::LlmConfigProvider::new(auth_storage).map_err(GrpcError::CoreError)?;
145
146 let api_client = Arc::new(ApiClient::new_with_deps(
147 llm_config_provider.clone(),
148 provider_registry.clone(),
149 model_registry.clone(),
150 ));
151
152 let workspace_path = workspace_root.unwrap_or_else(|| {
153 std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."))
154 });
155 let environment_root = steer_core::utils::paths::AppPaths::local_environment_root();
156 let workspace =
157 steer_core::workspace::create_workspace(&steer_core::workspace::WorkspaceConfig::Local {
158 path: workspace_path.clone(),
159 })
160 .await
161 .map_err(|e| GrpcError::InvalidSessionState {
162 reason: format!("Failed to create workspace: {e}"),
163 })?;
164 let workspace_manager = Arc::new(
165 LocalWorkspaceManager::new(environment_root.clone())
166 .await
167 .map_err(|e| GrpcError::InvalidSessionState {
168 reason: format!("Failed to create workspace manager: {e}"),
169 })?,
170 );
171 let repo_manager: Arc<dyn RepoManager> = workspace_manager.clone();
172
173 let tool_executor = ToolSystemBuilder::new(
174 workspace,
175 event_store.clone(),
176 api_client.clone(),
177 model_registry.clone(),
178 )
179 .with_workspace_manager(workspace_manager)
180 .with_repo_manager(repo_manager)
181 .build();
182
183 let runtime_service = RuntimeService::spawn(event_store, api_client, tool_executor);
184
185 let (channel, server_handle) = create_local_channel(
186 &runtime_service,
187 catalog,
188 model_registry,
189 provider_registry,
190 llm_config_provider,
191 environment_root.clone(),
192 )
193 .await?;
194
195 Ok(LocalGrpcSetup {
196 channel,
197 server_handle,
198 runtime_service,
199 })
200}
201
202pub async fn setup_local_grpc(
203 default_model: ModelId,
204 session_db_path: Option<std::path::PathBuf>,
205 workspace_root: Option<std::path::PathBuf>,
206) -> Result<(Channel, tokio::task::JoinHandle<()>)> {
207 let setup = setup_local_grpc_with_catalog(
208 default_model,
209 session_db_path,
210 CatalogConfig::default(),
211 workspace_root,
212 )
213 .await?;
214 Ok((setup.channel, setup.server_handle))
215}
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use steer_core::api::error::ApiError;
220 use steer_core::api::provider::{CompletionResponse, Provider};
221 use steer_core::app::conversation::AssistantContent;
222 use steer_core::app::domain::action::Action;
223 use steer_core::app::domain::event::SessionEvent;
224 use steer_core::app::domain::session::EventStore;
225 use steer_core::app::domain::session::SqliteEventStore;
226 use steer_core::app::domain::types::OpId;
227 use steer_core::config::model::ModelId;
228 use steer_proto::agent::v1::{
229 CompactSessionRequest, ExecuteBashCommandRequest, ListModelsRequest, SendMessageRequest,
230 SubscribeSessionEventsRequest, agent_service_client::AgentServiceClient,
231 };
232 use tempfile::TempDir;
233 use tokio::time::{Duration, Instant, timeout};
234 use tokio_util::sync::CancellationToken;
235 use tonic::Code;
236
237 const STUB_RESPONSE: &str = "stub response";
238
239 #[derive(Clone)]
240 struct StubProvider;
241
242 #[async_trait::async_trait]
243 impl Provider for StubProvider {
244 fn name(&self) -> &'static str {
245 "stub"
246 }
247
248 async fn complete(
249 &self,
250 _model_id: &ModelId,
251 _messages: Vec<steer_core::app::conversation::Message>,
252 _system: Option<steer_core::app::SystemContext>,
253 _tools: Option<Vec<steer_tools::ToolSchema>>,
254 _call_options: Option<steer_core::config::model::ModelParameters>,
255 _token: CancellationToken,
256 ) -> std::result::Result<CompletionResponse, ApiError> {
257 Ok(CompletionResponse {
258 content: vec![AssistantContent::Text {
259 text: STUB_RESPONSE.to_string(),
260 }],
261 usage: None,
262 })
263 }
264 }
265
266 async fn setup_local_grpc_with_stub_provider(
267 default_model: ModelId,
268 workspace_root: std::path::PathBuf,
269 ) -> Result<LocalGrpcSetup> {
270 let in_memory_store = Arc::new(InMemoryEventStore::new());
271 let event_store: Arc<dyn steer_core::app::domain::session::EventStore> =
272 in_memory_store.clone();
273 let catalog: Arc<dyn SessionCatalog> = in_memory_store;
274 let catalog_config = CatalogConfig::default();
275
276 let model_registry = Arc::new(
277 steer_core::model_registry::ModelRegistry::load(&catalog_config.catalog_paths)
278 .map_err(GrpcError::CoreError)?,
279 );
280 let provider_registry = Arc::new(
281 steer_core::auth::ProviderRegistry::load(&catalog_config.catalog_paths)
282 .map_err(GrpcError::CoreError)?,
283 );
284
285 let auth_storage = Arc::new(steer_core::test_utils::InMemoryAuthStorage::new());
286 let llm_config_provider = steer_core::config::LlmConfigProvider::new(auth_storage)
287 .map_err(GrpcError::CoreError)?;
288
289 let api_client = Arc::new(ApiClient::new_with_deps(
290 llm_config_provider.clone(),
291 provider_registry.clone(),
292 model_registry.clone(),
293 ));
294 api_client.insert_test_provider(default_model.provider.clone(), Arc::new(StubProvider));
295
296 let workspace = steer_core::workspace::create_workspace(
297 &steer_core::workspace::WorkspaceConfig::Local {
298 path: workspace_root.clone(),
299 },
300 )
301 .await
302 .map_err(|e| GrpcError::InvalidSessionState {
303 reason: format!("Failed to create workspace: {e}"),
304 })?;
305
306 let environment_root = workspace_root.join(".steer-env");
307 let workspace_manager = Arc::new(
308 LocalWorkspaceManager::new(environment_root.clone())
309 .await
310 .map_err(|e| GrpcError::InvalidSessionState {
311 reason: format!("Failed to create workspace manager: {e}"),
312 })?,
313 );
314 let repo_manager: Arc<dyn RepoManager> = workspace_manager.clone();
315
316 let tool_executor = ToolSystemBuilder::new(
317 workspace,
318 event_store.clone(),
319 api_client.clone(),
320 model_registry.clone(),
321 )
322 .with_workspace_manager(workspace_manager)
323 .with_repo_manager(repo_manager)
324 .build();
325
326 let runtime_service = RuntimeService::spawn(event_store, api_client, tool_executor);
327
328 let (channel, server_handle) = create_local_channel(
329 &runtime_service,
330 catalog,
331 model_registry,
332 provider_registry,
333 llm_config_provider,
334 environment_root,
335 )
336 .await?;
337
338 Ok(LocalGrpcSetup {
339 channel,
340 server_handle,
341 runtime_service,
342 })
343 }
344
345 async fn setup_local_grpc_with_stub_provider_and_store(
346 default_model: ModelId,
347 workspace_root: std::path::PathBuf,
348 session_db_path: Option<std::path::PathBuf>,
349 ) -> Result<LocalGrpcSetup> {
350 let (event_store, catalog): (
351 Arc<dyn steer_core::app::domain::session::EventStore>,
352 Arc<dyn SessionCatalog>,
353 ) = if let Some(db_path) = session_db_path {
354 let sqlite_store = Arc::new(
355 steer_core::app::domain::session::SqliteEventStore::new(&db_path)
356 .await
357 .map_err(|e| GrpcError::InvalidSessionState {
358 reason: format!("Failed to create event store: {e}"),
359 })?,
360 );
361 (sqlite_store.clone(), sqlite_store)
362 } else {
363 let in_memory_store = Arc::new(InMemoryEventStore::new());
364 (in_memory_store.clone(), in_memory_store)
365 };
366
367 let catalog_config = CatalogConfig::default();
368 let model_registry = Arc::new(
369 steer_core::model_registry::ModelRegistry::load(&catalog_config.catalog_paths)
370 .map_err(GrpcError::CoreError)?,
371 );
372 let provider_registry = Arc::new(
373 steer_core::auth::ProviderRegistry::load(&catalog_config.catalog_paths)
374 .map_err(GrpcError::CoreError)?,
375 );
376
377 let auth_storage = Arc::new(steer_core::test_utils::InMemoryAuthStorage::new());
378 let llm_config_provider = steer_core::config::LlmConfigProvider::new(auth_storage)
379 .map_err(GrpcError::CoreError)?;
380
381 let api_client = Arc::new(ApiClient::new_with_deps(
382 llm_config_provider.clone(),
383 provider_registry.clone(),
384 model_registry.clone(),
385 ));
386 api_client.insert_test_provider(default_model.provider.clone(), Arc::new(StubProvider));
387
388 let workspace = steer_core::workspace::create_workspace(
389 &steer_core::workspace::WorkspaceConfig::Local {
390 path: workspace_root.clone(),
391 },
392 )
393 .await
394 .map_err(|e| GrpcError::InvalidSessionState {
395 reason: format!("Failed to create workspace: {e}"),
396 })?;
397
398 let environment_root = workspace_root.join(".steer-env");
399 let workspace_manager = Arc::new(
400 LocalWorkspaceManager::new(environment_root.clone())
401 .await
402 .map_err(|e| GrpcError::InvalidSessionState {
403 reason: format!("Failed to create workspace manager: {e}"),
404 })?,
405 );
406 let repo_manager: Arc<dyn RepoManager> = workspace_manager.clone();
407
408 let tool_executor = ToolSystemBuilder::new(
409 workspace,
410 event_store.clone(),
411 api_client.clone(),
412 model_registry.clone(),
413 )
414 .with_workspace_manager(workspace_manager)
415 .with_repo_manager(repo_manager)
416 .build();
417
418 let runtime_service = RuntimeService::spawn(event_store, api_client, tool_executor);
419
420 let (channel, server_handle) = create_local_channel(
421 &runtime_service,
422 catalog,
423 model_registry,
424 provider_registry,
425 llm_config_provider,
426 environment_root,
427 )
428 .await?;
429
430 Ok(LocalGrpcSetup {
431 channel,
432 server_handle,
433 runtime_service,
434 })
435 }
436
437 fn test_workspace_root() -> TempDir {
438 TempDir::new().expect("workspace tempdir")
439 }
440
441 async fn next_event(
442 stream: &mut tonic::Streaming<steer_proto::agent::v1::SessionEvent>,
443 ) -> steer_proto::agent::v1::SessionEvent {
444 timeout(Duration::from_secs(5), stream.message())
445 .await
446 .expect("timeout")
447 .expect("stream ok")
448 .expect("event")
449 }
450
451 async fn wait_for_processing_completed(
452 stream: &mut tonic::Streaming<steer_proto::agent::v1::SessionEvent>,
453 op_id: &str,
454 ) {
455 loop {
456 let event = next_event(stream).await;
457 if matches!(
458 event.event,
459 Some(steer_proto::agent::v1::session_event::Event::ProcessingCompleted(
460 ref e
461 )) if e.op_id == op_id
462 ) {
463 break;
464 }
465 }
466 }
467
468 fn proto_event_kind(event: &steer_proto::agent::v1::SessionEvent) -> &'static str {
469 use steer_proto::agent::v1::session_event::Event;
470
471 match event.event {
472 Some(Event::AssistantMessageAdded(_)) => "AssistantMessageAdded",
473 Some(Event::UserMessageAdded(_)) => "UserMessageAdded",
474 Some(Event::ToolMessageAdded(_)) => "ToolMessageAdded",
475 Some(Event::MessageUpdated(_)) => "MessageUpdated",
476 Some(Event::ToolCallStarted(_)) => "ToolCallStarted",
477 Some(Event::ToolCallCompleted(_)) => "ToolCallCompleted",
478 Some(Event::ToolCallFailed(_)) => "ToolCallFailed",
479 Some(Event::ProcessingStarted(_)) => "ProcessingStarted",
480 Some(Event::ProcessingCompleted(_)) => "ProcessingCompleted",
481 Some(Event::RequestToolApproval(_)) => "RequestToolApproval",
482 Some(Event::OperationCancelled(_)) => "OperationCancelled",
483 Some(Event::Error(_)) => "Error",
484 Some(Event::WorkspaceChanged(_)) => "WorkspaceChanged",
485 Some(Event::ConversationCompacted(_)) => "ConversationCompacted",
486 Some(Event::StreamDelta(_)) => "StreamDelta",
487 Some(Event::CompactResult(_)) => "CompactResult",
488 Some(Event::McpServerStateChanged(_)) => "McpServerStateChanged",
489 Some(Event::SessionConfigUpdated(_)) => "SessionConfigUpdated",
490 Some(Event::QueueUpdated(_)) => "QueueUpdated",
491 Some(Event::LlmUsageUpdated(_)) => "LlmUsageUpdated",
492 None => "None",
493 }
494 }
495
496 fn core_event_kind(event: &SessionEvent) -> &'static str {
497 match event {
498 SessionEvent::SessionCreated { .. } => "SessionCreated",
499 SessionEvent::SessionConfigUpdated { .. } => "SessionConfigUpdated",
500 SessionEvent::AssistantMessageAdded { .. } => "AssistantMessageAdded",
501 SessionEvent::UserMessageAdded { .. } => "UserMessageAdded",
502 SessionEvent::ToolMessageAdded { .. } => "ToolMessageAdded",
503 SessionEvent::MessageUpdated { .. } => "MessageUpdated",
504 SessionEvent::ToolCallStarted { .. } => "ToolCallStarted",
505 SessionEvent::ToolCallCompleted { .. } => "ToolCallCompleted",
506 SessionEvent::ToolCallFailed { .. } => "ToolCallFailed",
507 SessionEvent::ApprovalRequested { .. } => "ApprovalRequested",
508 SessionEvent::ApprovalDecided { .. } => "ApprovalDecided",
509 SessionEvent::OperationStarted { .. } => "OperationStarted",
510 SessionEvent::OperationCompleted { .. } => "OperationCompleted",
511 SessionEvent::OperationCancelled { .. } => "OperationCancelled",
512 SessionEvent::LlmUsageUpdated { .. } => "LlmUsageUpdated",
513 SessionEvent::CompactResult { .. } => "CompactResult",
514 SessionEvent::ConversationCompacted { .. } => "ConversationCompacted",
515 SessionEvent::WorkspaceChanged => "WorkspaceChanged",
516 SessionEvent::QueueUpdated { .. } => "QueueUpdated",
517 SessionEvent::Error { .. } => "Error",
518 SessionEvent::McpServerStateChanged { .. } => "McpServerStateChanged",
519 }
520 }
521
522 #[tokio::test]
523 async fn test_since_sequence_replay_returns_persisted_events() {
524 let workspace_root = test_workspace_root();
525 let setup = setup_local_grpc_with_catalog(
526 steer_core::config::model::builtin::claude_sonnet_4_5(),
527 None,
528 CatalogConfig::default(),
529 Some(workspace_root.path().to_path_buf()),
530 )
531 .await
532 .expect("local grpc setup");
533
534 let session_id = setup
535 .runtime_service
536 .handle()
537 .create_session(steer_core::test_utils::read_only_session_config(
538 steer_core::config::model::builtin::claude_sonnet_4_5(),
539 ))
540 .await
541 .expect("create session");
542
543 let op_id = OpId::new();
544 setup
545 .runtime_service
546 .handle()
547 .dispatch_action(
548 session_id,
549 Action::ModelResponseError {
550 session_id,
551 op_id,
552 error: "boom".to_string(),
553 },
554 )
555 .await
556 .expect("dispatch action");
557
558 let mut client = AgentServiceClient::new(setup.channel.clone());
559 let request = tonic::Request::new(SubscribeSessionEventsRequest {
560 session_id: session_id.to_string(),
561 since_sequence: Some(0),
562 });
563
564 let mut stream = client
565 .subscribe_session_events(request)
566 .await
567 .expect("subscribe")
568 .into_inner();
569
570 let mut events = Vec::new();
571 for _ in 0..2 {
572 let event = timeout(Duration::from_secs(2), stream.message())
573 .await
574 .expect("timeout")
575 .expect("stream ok")
576 .expect("event");
577 events.push(event);
578 }
579
580 assert!(events.iter().any(|evt| matches!(
581 evt.event,
582 Some(steer_proto::agent::v1::session_event::Event::Error(_))
583 )));
584 assert!(events.iter().any(|evt| matches!(
585 evt.event,
586 Some(steer_proto::agent::v1::session_event::Event::ProcessingCompleted(_))
587 )));
588 }
589
590 #[tokio::test]
591 async fn test_compaction_flow_end_to_end() {
592 let workspace_root = test_workspace_root();
593 let model = steer_core::config::model::builtin::claude_sonnet_4_5();
594 let setup =
595 setup_local_grpc_with_stub_provider(model.clone(), workspace_root.path().to_path_buf())
596 .await
597 .expect("local grpc setup");
598
599 let session_id = setup
600 .runtime_service
601 .handle()
602 .create_session(steer_core::test_utils::read_only_session_config(
603 steer_core::config::model::builtin::claude_sonnet_4_5(),
604 ))
605 .await
606 .expect("create session");
607
608 let mut event_client = AgentServiceClient::new(setup.channel.clone());
609 let mut action_client = AgentServiceClient::new(setup.channel.clone());
610
611 let request = tonic::Request::new(SubscribeSessionEventsRequest {
612 session_id: session_id.to_string(),
613 since_sequence: None,
614 });
615
616 let mut stream = event_client
617 .subscribe_session_events(request)
618 .await
619 .expect("subscribe")
620 .into_inner();
621
622 let model_spec = crate::grpc::conversions::model_to_proto(model.clone());
623
624 let first_response = action_client
625 .send_message(tonic::Request::new(SendMessageRequest {
626 session_id: session_id.to_string(),
627 message: "first".to_string(),
628 content: Vec::new(),
629 model: Some(model_spec.clone()),
630 }))
631 .await
632 .expect("send_message");
633 let first_op = first_response.into_inner().operation.expect("operation");
634 wait_for_processing_completed(&mut stream, &first_op.id).await;
635
636 let second_response = action_client
637 .send_message(tonic::Request::new(SendMessageRequest {
638 session_id: session_id.to_string(),
639 message: "second".to_string(),
640 content: Vec::new(),
641 model: Some(model_spec.clone()),
642 }))
643 .await
644 .expect("send_message");
645 let second_op = second_response.into_inner().operation.expect("operation");
646 wait_for_processing_completed(&mut stream, &second_op.id).await;
647
648 action_client
649 .compact_session(tonic::Request::new(CompactSessionRequest {
650 session_id: session_id.to_string(),
651 model: Some(model_spec),
652 }))
653 .await
654 .expect("compact_session");
655
656 let mut compact_summary = None;
657 let mut compaction_record = None;
658
659 while compact_summary.is_none() || compaction_record.is_none() {
660 let event = next_event(&mut stream).await;
661 match event.event {
662 Some(steer_proto::agent::v1::session_event::Event::CompactResult(e)) => {
663 let result = e.result.expect("compact result");
664 match result.result {
665 Some(steer_proto::agent::v1::compact_result::Result::Success(success)) => {
666 compact_summary = Some(success.summary);
667 }
668 other => panic!("unexpected compact result: {other:?}"),
669 }
670 }
671 Some(steer_proto::agent::v1::session_event::Event::ConversationCompacted(e)) => {
672 compaction_record = Some(e.record.expect("compaction record"));
673 }
674 _ => {}
675 }
676 }
677
678 assert_eq!(compact_summary.expect("summary"), STUB_RESPONSE);
679 let record = compaction_record.expect("record");
680 assert!(!record.id.is_empty());
681 assert_eq!(record.model, model.id);
682 }
683
684 #[tokio::test]
685 async fn test_compaction_emits_processing_completed_with_sqlite_store() {
686 let workspace_root = test_workspace_root();
687 let db_path = workspace_root.path().join("sessions.db");
688 let model = steer_core::config::model::builtin::claude_sonnet_4_5();
689 let setup = setup_local_grpc_with_stub_provider_and_store(
690 model.clone(),
691 workspace_root.path().to_path_buf(),
692 Some(db_path.clone()),
693 )
694 .await
695 .expect("local grpc setup");
696
697 let session_id = setup
698 .runtime_service
699 .handle()
700 .create_session(steer_core::test_utils::read_only_session_config(
701 steer_core::config::model::builtin::claude_sonnet_4_5(),
702 ))
703 .await
704 .expect("create session");
705
706 let mut event_client = AgentServiceClient::new(setup.channel.clone());
707 let mut action_client = AgentServiceClient::new(setup.channel.clone());
708
709 let request = tonic::Request::new(SubscribeSessionEventsRequest {
710 session_id: session_id.to_string(),
711 since_sequence: None,
712 });
713
714 let mut stream = event_client
715 .subscribe_session_events(request)
716 .await
717 .expect("subscribe")
718 .into_inner();
719
720 let sqlite_store = SqliteEventStore::new(&db_path)
721 .await
722 .expect("open sqlite store");
723 let before_len = sqlite_store
724 .load_events(session_id)
725 .await
726 .expect("load events")
727 .len();
728
729 let model_spec = crate::grpc::conversions::model_to_proto(model.clone());
730
731 let first_response = action_client
732 .send_message(tonic::Request::new(SendMessageRequest {
733 session_id: session_id.to_string(),
734 message: "first".to_string(),
735 content: Vec::new(),
736 model: Some(model_spec.clone()),
737 }))
738 .await
739 .expect("send_message");
740 let first_op = first_response.into_inner().operation.expect("operation");
741 wait_for_processing_completed(&mut stream, &first_op.id).await;
742
743 let second_response = action_client
744 .send_message(tonic::Request::new(SendMessageRequest {
745 session_id: session_id.to_string(),
746 message: "second".to_string(),
747 content: Vec::new(),
748 model: Some(model_spec.clone()),
749 }))
750 .await
751 .expect("send_message");
752 let second_op = second_response.into_inner().operation.expect("operation");
753 wait_for_processing_completed(&mut stream, &second_op.id).await;
754
755 action_client
756 .compact_session(tonic::Request::new(CompactSessionRequest {
757 session_id: session_id.to_string(),
758 model: Some(model_spec),
759 }))
760 .await
761 .expect("compact_session");
762
763 let mut seen = Vec::new();
764 let mut compaction_op_id = None;
765 let mut completed = false;
766 let deadline = Instant::now() + Duration::from_secs(5);
767
768 while Instant::now() < deadline {
769 let remaining = deadline.saturating_duration_since(Instant::now());
770 let next = timeout(remaining, stream.message()).await;
771 let Ok(Ok(Some(event))) = next else { break };
772
773 seen.push(proto_event_kind(&event));
774
775 if let Some(steer_proto::agent::v1::session_event::Event::ProcessingStarted(ref e)) =
776 event.event
777 {
778 compaction_op_id = Some(e.op_id.clone());
779 }
780
781 if let Some(steer_proto::agent::v1::session_event::Event::ProcessingCompleted(e)) =
782 event.event
783 {
784 if compaction_op_id.as_deref() == Some(e.op_id.as_str()) {
785 completed = true;
786 break;
787 }
788 }
789 }
790
791 if !completed {
792 let persisted = sqlite_store
793 .load_events(session_id)
794 .await
795 .expect("load events");
796 let persisted_kinds: Vec<&'static str> = persisted
797 .iter()
798 .map(|(_, event)| core_event_kind(event))
799 .collect();
800
801 let after_len = persisted.len();
802 let new_events = persisted_kinds
803 .iter()
804 .skip(before_len)
805 .copied()
806 .collect::<Vec<_>>();
807
808 panic!(
809 "missing ProcessingCompleted for compaction; seen={seen:?} \
810persisted_new={new_events:?} (before_len={before_len}, after_len={after_len})"
811 );
812 }
813 }
814
815 #[tokio::test]
816 async fn test_send_message_uses_session_default_model_when_not_specified() {
817 let workspace_root = test_workspace_root();
818 let setup = setup_local_grpc_with_catalog(
819 steer_core::config::model::builtin::claude_sonnet_4_5(),
820 None,
821 CatalogConfig::default(),
822 Some(workspace_root.path().to_path_buf()),
823 )
824 .await
825 .expect("local grpc setup");
826
827 let session_id = setup
828 .runtime_service
829 .handle()
830 .create_session(steer_core::test_utils::read_only_session_config(
831 steer_core::config::model::builtin::claude_sonnet_4_5(),
832 ))
833 .await
834 .expect("create session");
835
836 let mut client = AgentServiceClient::new(setup.channel.clone());
837 let request = tonic::Request::new(SendMessageRequest {
838 session_id: session_id.to_string(),
839 message: "hello".to_string(),
840 content: Vec::new(),
841 model: None,
842 });
843
844 let response = client
845 .send_message(request)
846 .await
847 .expect("send_message should succeed using session default model")
848 .into_inner();
849
850 assert!(
851 response.operation.is_some(),
852 "Response should contain an operation"
853 );
854 }
855
856 #[tokio::test]
857 async fn test_compact_session_requires_model_spec() {
858 let workspace_root = test_workspace_root();
859 let setup = setup_local_grpc_with_catalog(
860 steer_core::config::model::builtin::claude_sonnet_4_5(),
861 None,
862 CatalogConfig::default(),
863 Some(workspace_root.path().to_path_buf()),
864 )
865 .await
866 .expect("local grpc setup");
867
868 let session_id = setup
869 .runtime_service
870 .handle()
871 .create_session(steer_core::test_utils::read_only_session_config(
872 steer_core::config::model::builtin::claude_sonnet_4_5(),
873 ))
874 .await
875 .expect("create session");
876
877 let mut client = AgentServiceClient::new(setup.channel.clone());
878 let request = tonic::Request::new(CompactSessionRequest {
879 session_id: session_id.to_string(),
880 model: None,
881 });
882
883 let err = client
884 .compact_session(request)
885 .await
886 .expect_err("compact_session should fail without model");
887 assert_eq!(err.code(), Code::InvalidArgument);
888 }
889
890 #[tokio::test]
891 async fn test_list_models_sets_context_window_tokens_field() {
892 let workspace_root = test_workspace_root();
893 let setup = setup_local_grpc_with_catalog(
894 steer_core::config::model::builtin::claude_sonnet_4_5(),
895 None,
896 CatalogConfig::default(),
897 Some(workspace_root.path().to_path_buf()),
898 )
899 .await
900 .expect("local grpc setup");
901
902 let mut client = AgentServiceClient::new(setup.channel.clone());
903 let response = client
904 .list_models(tonic::Request::new(ListModelsRequest { provider_id: None }))
905 .await
906 .expect("list_models should succeed")
907 .into_inner();
908
909 assert!(
910 !response.models.is_empty(),
911 "Expected at least one recommended model"
912 );
913 assert!(
914 response
915 .models
916 .iter()
917 .all(|model| model.context_window_tokens.is_some()),
918 "expected context_window_tokens to be populated for all listed models"
919 );
920 }
921
922 #[tokio::test]
923 async fn test_execute_bash_command_does_not_require_model_spec() {
924 let workspace_root = test_workspace_root();
925 let setup = setup_local_grpc_with_catalog(
926 steer_core::config::model::builtin::claude_sonnet_4_5(),
927 None,
928 CatalogConfig::default(),
929 Some(workspace_root.path().to_path_buf()),
930 )
931 .await
932 .expect("local grpc setup");
933
934 let session_id = setup
935 .runtime_service
936 .handle()
937 .create_session(steer_core::test_utils::read_only_session_config(
938 steer_core::config::model::builtin::claude_sonnet_4_5(),
939 ))
940 .await
941 .expect("create session");
942
943 let mut client = AgentServiceClient::new(setup.channel.clone());
944 let request = tonic::Request::new(ExecuteBashCommandRequest {
945 session_id: session_id.to_string(),
946 command: "echo hi".to_string(),
947 });
948
949 client
950 .execute_bash_command(request)
951 .await
952 .expect("execute_bash_command should succeed without model");
953 }
954}