Skip to main content

steer_grpc/
local_server.rs

1use crate::grpc::RuntimeAgentService;
2use crate::grpc::error::GrpcError;
3type Result<T> = std::result::Result<T, GrpcError>;
4use std::sync::Arc;
5use steer_core::api::Client as ApiClient;
6use steer_core::app::domain::runtime::RuntimeService;
7use steer_core::app::domain::session::{InMemoryEventStore, SessionCatalog};
8use steer_core::catalog::CatalogConfig;
9use steer_core::config::model::ModelId;
10use steer_core::tools::ToolSystemBuilder;
11use steer_proto::agent::v1::agent_service_server::AgentServiceServer;
12use steer_workspace::{LocalEnvironmentManager, LocalWorkspaceManager, RepoManager};
13use tokio::sync::oneshot;
14use tonic::transport::{Channel, Server};
15
16pub async fn create_local_channel(
17    runtime_service: &RuntimeService,
18    catalog: Arc<dyn SessionCatalog>,
19    model_registry: Arc<steer_core::model_registry::ModelRegistry>,
20    provider_registry: Arc<steer_core::auth::ProviderRegistry>,
21    llm_config_provider: steer_core::config::LlmConfigProvider,
22    environment_root: std::path::PathBuf,
23) -> Result<(Channel, tokio::task::JoinHandle<()>)> {
24    let (tx, rx) = oneshot::channel();
25
26    let workspace_manager = Arc::new(
27        LocalWorkspaceManager::new(environment_root.clone())
28            .await
29            .map_err(|e| GrpcError::InvalidSessionState {
30                reason: format!("Failed to create workspace manager: {e}"),
31            })?,
32    );
33    let repo_manager: Arc<dyn RepoManager> = workspace_manager.clone();
34    let environment_manager = Arc::new(LocalEnvironmentManager::new(environment_root));
35
36    let service = RuntimeAgentService::new(crate::grpc::RuntimeAgentDeps {
37        runtime: runtime_service.handle(),
38        catalog,
39        llm_config_provider,
40        model_registry,
41        provider_registry,
42        environment_manager,
43        workspace_manager,
44        repo_manager,
45    });
46    let svc = AgentServiceServer::new(service);
47
48    let server_handle: tokio::task::JoinHandle<()> = tokio::spawn(async move {
49        let addr: std::net::SocketAddr = match "127.0.0.1:0".parse() {
50            Ok(addr) => addr,
51            Err(error) => {
52                tracing::error!(error = %error, "Failed to parse localhost address");
53                return;
54            }
55        };
56        let listener = match tokio::net::TcpListener::bind(addr).await {
57            Ok(listener) => listener,
58            Err(error) => {
59                tracing::error!(error = %error, "Failed to bind localhost listener");
60                return;
61            }
62        };
63        let local_addr = match listener.local_addr() {
64            Ok(local_addr) => local_addr,
65            Err(error) => {
66                tracing::error!(error = %error, "Failed to read localhost address");
67                return;
68            }
69        };
70
71        if tx.send(local_addr).is_err() {
72            tracing::warn!("Failed to send localhost address to channel");
73            return;
74        }
75
76        if let Err(error) = Server::builder()
77            .add_service(svc)
78            .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
79            .await
80        {
81            tracing::error!(error = %error, "Failed to run localhost server");
82        }
83    });
84
85    let addr = rx
86        .await
87        .map_err(|e| GrpcError::ChannelError(format!("Failed to receive server address: {e}")))?;
88
89    let endpoint =
90        tonic::transport::Endpoint::try_from(format!("http://{addr}"))?.tcp_nodelay(true);
91    let channel = endpoint.connect().await?;
92
93    Ok((channel, server_handle))
94}
95
96pub struct LocalGrpcSetup {
97    pub channel: Channel,
98    pub server_handle: tokio::task::JoinHandle<()>,
99    pub runtime_service: RuntimeService,
100}
101
102pub async fn setup_local_grpc_with_catalog(
103    _default_model: ModelId,
104    session_db_path: Option<std::path::PathBuf>,
105    catalog_config: CatalogConfig,
106    workspace_root: Option<std::path::PathBuf>,
107) -> Result<LocalGrpcSetup> {
108    let (event_store, catalog): (
109        Arc<dyn steer_core::app::domain::session::EventStore>,
110        Arc<dyn SessionCatalog>,
111    ) = if let Some(db_path) = session_db_path {
112        let sqlite_store = Arc::new(
113            steer_core::app::domain::session::SqliteEventStore::new(&db_path)
114                .await
115                .map_err(|e| GrpcError::InvalidSessionState {
116                    reason: format!("Failed to create event store: {e}"),
117                })?,
118        );
119        (sqlite_store.clone(), sqlite_store)
120    } else {
121        let in_memory_store = Arc::new(InMemoryEventStore::new());
122        (in_memory_store.clone(), in_memory_store)
123    };
124
125    let model_registry = Arc::new(
126        steer_core::model_registry::ModelRegistry::load(&catalog_config.catalog_paths)
127            .map_err(GrpcError::CoreError)?,
128    );
129
130    let provider_registry = Arc::new(
131        steer_core::auth::ProviderRegistry::load(&catalog_config.catalog_paths)
132            .map_err(GrpcError::CoreError)?,
133    );
134
135    #[cfg(not(test))]
136    let auth_storage = std::sync::Arc::new(
137        steer_core::auth::DefaultAuthStorage::new().map_err(|e| GrpcError::CoreError(e.into()))?,
138    );
139
140    #[cfg(test)]
141    let auth_storage = std::sync::Arc::new(steer_core::test_utils::InMemoryAuthStorage::new());
142
143    let llm_config_provider =
144        steer_core::config::LlmConfigProvider::new(auth_storage).map_err(GrpcError::CoreError)?;
145
146    let api_client = Arc::new(ApiClient::new_with_deps(
147        llm_config_provider.clone(),
148        provider_registry.clone(),
149        model_registry.clone(),
150    ));
151
152    let workspace_path = workspace_root.unwrap_or_else(|| {
153        std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."))
154    });
155    let environment_root = steer_core::utils::paths::AppPaths::local_environment_root();
156    let workspace =
157        steer_core::workspace::create_workspace(&steer_core::workspace::WorkspaceConfig::Local {
158            path: workspace_path.clone(),
159        })
160        .await
161        .map_err(|e| GrpcError::InvalidSessionState {
162            reason: format!("Failed to create workspace: {e}"),
163        })?;
164    let workspace_manager = Arc::new(
165        LocalWorkspaceManager::new(environment_root.clone())
166            .await
167            .map_err(|e| GrpcError::InvalidSessionState {
168                reason: format!("Failed to create workspace manager: {e}"),
169            })?,
170    );
171    let repo_manager: Arc<dyn RepoManager> = workspace_manager.clone();
172
173    let tool_executor = ToolSystemBuilder::new(
174        workspace,
175        event_store.clone(),
176        api_client.clone(),
177        model_registry.clone(),
178    )
179    .with_workspace_manager(workspace_manager)
180    .with_repo_manager(repo_manager)
181    .build();
182
183    let runtime_service = RuntimeService::spawn(event_store, api_client, tool_executor);
184
185    let (channel, server_handle) = create_local_channel(
186        &runtime_service,
187        catalog,
188        model_registry,
189        provider_registry,
190        llm_config_provider,
191        environment_root.clone(),
192    )
193    .await?;
194
195    Ok(LocalGrpcSetup {
196        channel,
197        server_handle,
198        runtime_service,
199    })
200}
201
202pub async fn setup_local_grpc(
203    default_model: ModelId,
204    session_db_path: Option<std::path::PathBuf>,
205    workspace_root: Option<std::path::PathBuf>,
206) -> Result<(Channel, tokio::task::JoinHandle<()>)> {
207    let setup = setup_local_grpc_with_catalog(
208        default_model,
209        session_db_path,
210        CatalogConfig::default(),
211        workspace_root,
212    )
213    .await?;
214    Ok((setup.channel, setup.server_handle))
215}
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use steer_core::api::error::ApiError;
220    use steer_core::api::provider::{CompletionResponse, Provider};
221    use steer_core::app::conversation::AssistantContent;
222    use steer_core::app::domain::action::Action;
223    use steer_core::app::domain::event::SessionEvent;
224    use steer_core::app::domain::session::EventStore;
225    use steer_core::app::domain::session::SqliteEventStore;
226    use steer_core::app::domain::types::OpId;
227    use steer_core::config::model::ModelId;
228    use steer_proto::agent::v1::{
229        CompactSessionRequest, ExecuteBashCommandRequest, ListModelsRequest, SendMessageRequest,
230        SubscribeSessionEventsRequest, agent_service_client::AgentServiceClient,
231    };
232    use tempfile::TempDir;
233    use tokio::time::{Duration, Instant, timeout};
234    use tokio_util::sync::CancellationToken;
235    use tonic::Code;
236
237    const STUB_RESPONSE: &str = "stub response";
238
239    #[derive(Clone)]
240    struct StubProvider;
241
242    #[async_trait::async_trait]
243    impl Provider for StubProvider {
244        fn name(&self) -> &'static str {
245            "stub"
246        }
247
248        async fn complete(
249            &self,
250            _model_id: &ModelId,
251            _messages: Vec<steer_core::app::conversation::Message>,
252            _system: Option<steer_core::app::SystemContext>,
253            _tools: Option<Vec<steer_tools::ToolSchema>>,
254            _call_options: Option<steer_core::config::model::ModelParameters>,
255            _token: CancellationToken,
256        ) -> std::result::Result<CompletionResponse, ApiError> {
257            Ok(CompletionResponse {
258                content: vec![AssistantContent::Text {
259                    text: STUB_RESPONSE.to_string(),
260                }],
261                usage: None,
262            })
263        }
264    }
265
266    async fn setup_local_grpc_with_stub_provider(
267        default_model: ModelId,
268        workspace_root: std::path::PathBuf,
269    ) -> Result<LocalGrpcSetup> {
270        let in_memory_store = Arc::new(InMemoryEventStore::new());
271        let event_store: Arc<dyn steer_core::app::domain::session::EventStore> =
272            in_memory_store.clone();
273        let catalog: Arc<dyn SessionCatalog> = in_memory_store;
274        let catalog_config = CatalogConfig::default();
275
276        let model_registry = Arc::new(
277            steer_core::model_registry::ModelRegistry::load(&catalog_config.catalog_paths)
278                .map_err(GrpcError::CoreError)?,
279        );
280        let provider_registry = Arc::new(
281            steer_core::auth::ProviderRegistry::load(&catalog_config.catalog_paths)
282                .map_err(GrpcError::CoreError)?,
283        );
284
285        let auth_storage = Arc::new(steer_core::test_utils::InMemoryAuthStorage::new());
286        let llm_config_provider = steer_core::config::LlmConfigProvider::new(auth_storage)
287            .map_err(GrpcError::CoreError)?;
288
289        let api_client = Arc::new(ApiClient::new_with_deps(
290            llm_config_provider.clone(),
291            provider_registry.clone(),
292            model_registry.clone(),
293        ));
294        api_client.insert_test_provider(default_model.provider.clone(), Arc::new(StubProvider));
295
296        let workspace = steer_core::workspace::create_workspace(
297            &steer_core::workspace::WorkspaceConfig::Local {
298                path: workspace_root.clone(),
299            },
300        )
301        .await
302        .map_err(|e| GrpcError::InvalidSessionState {
303            reason: format!("Failed to create workspace: {e}"),
304        })?;
305
306        let environment_root = workspace_root.join(".steer-env");
307        let workspace_manager = Arc::new(
308            LocalWorkspaceManager::new(environment_root.clone())
309                .await
310                .map_err(|e| GrpcError::InvalidSessionState {
311                    reason: format!("Failed to create workspace manager: {e}"),
312                })?,
313        );
314        let repo_manager: Arc<dyn RepoManager> = workspace_manager.clone();
315
316        let tool_executor = ToolSystemBuilder::new(
317            workspace,
318            event_store.clone(),
319            api_client.clone(),
320            model_registry.clone(),
321        )
322        .with_workspace_manager(workspace_manager)
323        .with_repo_manager(repo_manager)
324        .build();
325
326        let runtime_service = RuntimeService::spawn(event_store, api_client, tool_executor);
327
328        let (channel, server_handle) = create_local_channel(
329            &runtime_service,
330            catalog,
331            model_registry,
332            provider_registry,
333            llm_config_provider,
334            environment_root,
335        )
336        .await?;
337
338        Ok(LocalGrpcSetup {
339            channel,
340            server_handle,
341            runtime_service,
342        })
343    }
344
345    async fn setup_local_grpc_with_stub_provider_and_store(
346        default_model: ModelId,
347        workspace_root: std::path::PathBuf,
348        session_db_path: Option<std::path::PathBuf>,
349    ) -> Result<LocalGrpcSetup> {
350        let (event_store, catalog): (
351            Arc<dyn steer_core::app::domain::session::EventStore>,
352            Arc<dyn SessionCatalog>,
353        ) = if let Some(db_path) = session_db_path {
354            let sqlite_store = Arc::new(
355                steer_core::app::domain::session::SqliteEventStore::new(&db_path)
356                    .await
357                    .map_err(|e| GrpcError::InvalidSessionState {
358                        reason: format!("Failed to create event store: {e}"),
359                    })?,
360            );
361            (sqlite_store.clone(), sqlite_store)
362        } else {
363            let in_memory_store = Arc::new(InMemoryEventStore::new());
364            (in_memory_store.clone(), in_memory_store)
365        };
366
367        let catalog_config = CatalogConfig::default();
368        let model_registry = Arc::new(
369            steer_core::model_registry::ModelRegistry::load(&catalog_config.catalog_paths)
370                .map_err(GrpcError::CoreError)?,
371        );
372        let provider_registry = Arc::new(
373            steer_core::auth::ProviderRegistry::load(&catalog_config.catalog_paths)
374                .map_err(GrpcError::CoreError)?,
375        );
376
377        let auth_storage = Arc::new(steer_core::test_utils::InMemoryAuthStorage::new());
378        let llm_config_provider = steer_core::config::LlmConfigProvider::new(auth_storage)
379            .map_err(GrpcError::CoreError)?;
380
381        let api_client = Arc::new(ApiClient::new_with_deps(
382            llm_config_provider.clone(),
383            provider_registry.clone(),
384            model_registry.clone(),
385        ));
386        api_client.insert_test_provider(default_model.provider.clone(), Arc::new(StubProvider));
387
388        let workspace = steer_core::workspace::create_workspace(
389            &steer_core::workspace::WorkspaceConfig::Local {
390                path: workspace_root.clone(),
391            },
392        )
393        .await
394        .map_err(|e| GrpcError::InvalidSessionState {
395            reason: format!("Failed to create workspace: {e}"),
396        })?;
397
398        let environment_root = workspace_root.join(".steer-env");
399        let workspace_manager = Arc::new(
400            LocalWorkspaceManager::new(environment_root.clone())
401                .await
402                .map_err(|e| GrpcError::InvalidSessionState {
403                    reason: format!("Failed to create workspace manager: {e}"),
404                })?,
405        );
406        let repo_manager: Arc<dyn RepoManager> = workspace_manager.clone();
407
408        let tool_executor = ToolSystemBuilder::new(
409            workspace,
410            event_store.clone(),
411            api_client.clone(),
412            model_registry.clone(),
413        )
414        .with_workspace_manager(workspace_manager)
415        .with_repo_manager(repo_manager)
416        .build();
417
418        let runtime_service = RuntimeService::spawn(event_store, api_client, tool_executor);
419
420        let (channel, server_handle) = create_local_channel(
421            &runtime_service,
422            catalog,
423            model_registry,
424            provider_registry,
425            llm_config_provider,
426            environment_root,
427        )
428        .await?;
429
430        Ok(LocalGrpcSetup {
431            channel,
432            server_handle,
433            runtime_service,
434        })
435    }
436
437    fn test_workspace_root() -> TempDir {
438        TempDir::new().expect("workspace tempdir")
439    }
440
441    async fn next_event(
442        stream: &mut tonic::Streaming<steer_proto::agent::v1::SessionEvent>,
443    ) -> steer_proto::agent::v1::SessionEvent {
444        timeout(Duration::from_secs(5), stream.message())
445            .await
446            .expect("timeout")
447            .expect("stream ok")
448            .expect("event")
449    }
450
451    async fn wait_for_processing_completed(
452        stream: &mut tonic::Streaming<steer_proto::agent::v1::SessionEvent>,
453        op_id: &str,
454    ) {
455        loop {
456            let event = next_event(stream).await;
457            if matches!(
458                event.event,
459                Some(steer_proto::agent::v1::session_event::Event::ProcessingCompleted(
460                    ref e
461                )) if e.op_id == op_id
462            ) {
463                break;
464            }
465        }
466    }
467
468    fn proto_event_kind(event: &steer_proto::agent::v1::SessionEvent) -> &'static str {
469        use steer_proto::agent::v1::session_event::Event;
470
471        match event.event {
472            Some(Event::AssistantMessageAdded(_)) => "AssistantMessageAdded",
473            Some(Event::UserMessageAdded(_)) => "UserMessageAdded",
474            Some(Event::ToolMessageAdded(_)) => "ToolMessageAdded",
475            Some(Event::MessageUpdated(_)) => "MessageUpdated",
476            Some(Event::ToolCallStarted(_)) => "ToolCallStarted",
477            Some(Event::ToolCallCompleted(_)) => "ToolCallCompleted",
478            Some(Event::ToolCallFailed(_)) => "ToolCallFailed",
479            Some(Event::ProcessingStarted(_)) => "ProcessingStarted",
480            Some(Event::ProcessingCompleted(_)) => "ProcessingCompleted",
481            Some(Event::RequestToolApproval(_)) => "RequestToolApproval",
482            Some(Event::OperationCancelled(_)) => "OperationCancelled",
483            Some(Event::Error(_)) => "Error",
484            Some(Event::WorkspaceChanged(_)) => "WorkspaceChanged",
485            Some(Event::ConversationCompacted(_)) => "ConversationCompacted",
486            Some(Event::StreamDelta(_)) => "StreamDelta",
487            Some(Event::CompactResult(_)) => "CompactResult",
488            Some(Event::McpServerStateChanged(_)) => "McpServerStateChanged",
489            Some(Event::SessionConfigUpdated(_)) => "SessionConfigUpdated",
490            Some(Event::QueueUpdated(_)) => "QueueUpdated",
491            Some(Event::LlmUsageUpdated(_)) => "LlmUsageUpdated",
492            None => "None",
493        }
494    }
495
496    fn core_event_kind(event: &SessionEvent) -> &'static str {
497        match event {
498            SessionEvent::SessionCreated { .. } => "SessionCreated",
499            SessionEvent::SessionConfigUpdated { .. } => "SessionConfigUpdated",
500            SessionEvent::AssistantMessageAdded { .. } => "AssistantMessageAdded",
501            SessionEvent::UserMessageAdded { .. } => "UserMessageAdded",
502            SessionEvent::ToolMessageAdded { .. } => "ToolMessageAdded",
503            SessionEvent::MessageUpdated { .. } => "MessageUpdated",
504            SessionEvent::ToolCallStarted { .. } => "ToolCallStarted",
505            SessionEvent::ToolCallCompleted { .. } => "ToolCallCompleted",
506            SessionEvent::ToolCallFailed { .. } => "ToolCallFailed",
507            SessionEvent::ApprovalRequested { .. } => "ApprovalRequested",
508            SessionEvent::ApprovalDecided { .. } => "ApprovalDecided",
509            SessionEvent::OperationStarted { .. } => "OperationStarted",
510            SessionEvent::OperationCompleted { .. } => "OperationCompleted",
511            SessionEvent::OperationCancelled { .. } => "OperationCancelled",
512            SessionEvent::LlmUsageUpdated { .. } => "LlmUsageUpdated",
513            SessionEvent::CompactResult { .. } => "CompactResult",
514            SessionEvent::ConversationCompacted { .. } => "ConversationCompacted",
515            SessionEvent::WorkspaceChanged => "WorkspaceChanged",
516            SessionEvent::QueueUpdated { .. } => "QueueUpdated",
517            SessionEvent::Error { .. } => "Error",
518            SessionEvent::McpServerStateChanged { .. } => "McpServerStateChanged",
519        }
520    }
521
522    #[tokio::test]
523    async fn test_since_sequence_replay_returns_persisted_events() {
524        let workspace_root = test_workspace_root();
525        let setup = setup_local_grpc_with_catalog(
526            steer_core::config::model::builtin::claude_sonnet_4_5(),
527            None,
528            CatalogConfig::default(),
529            Some(workspace_root.path().to_path_buf()),
530        )
531        .await
532        .expect("local grpc setup");
533
534        let session_id = setup
535            .runtime_service
536            .handle()
537            .create_session(steer_core::test_utils::read_only_session_config(
538                steer_core::config::model::builtin::claude_sonnet_4_5(),
539            ))
540            .await
541            .expect("create session");
542
543        let op_id = OpId::new();
544        setup
545            .runtime_service
546            .handle()
547            .dispatch_action(
548                session_id,
549                Action::ModelResponseError {
550                    session_id,
551                    op_id,
552                    error: "boom".to_string(),
553                },
554            )
555            .await
556            .expect("dispatch action");
557
558        let mut client = AgentServiceClient::new(setup.channel.clone());
559        let request = tonic::Request::new(SubscribeSessionEventsRequest {
560            session_id: session_id.to_string(),
561            since_sequence: Some(0),
562        });
563
564        let mut stream = client
565            .subscribe_session_events(request)
566            .await
567            .expect("subscribe")
568            .into_inner();
569
570        let mut events = Vec::new();
571        for _ in 0..2 {
572            let event = timeout(Duration::from_secs(2), stream.message())
573                .await
574                .expect("timeout")
575                .expect("stream ok")
576                .expect("event");
577            events.push(event);
578        }
579
580        assert!(events.iter().any(|evt| matches!(
581            evt.event,
582            Some(steer_proto::agent::v1::session_event::Event::Error(_))
583        )));
584        assert!(events.iter().any(|evt| matches!(
585            evt.event,
586            Some(steer_proto::agent::v1::session_event::Event::ProcessingCompleted(_))
587        )));
588    }
589
590    #[tokio::test]
591    async fn test_compaction_flow_end_to_end() {
592        let workspace_root = test_workspace_root();
593        let model = steer_core::config::model::builtin::claude_sonnet_4_5();
594        let setup =
595            setup_local_grpc_with_stub_provider(model.clone(), workspace_root.path().to_path_buf())
596                .await
597                .expect("local grpc setup");
598
599        let session_id = setup
600            .runtime_service
601            .handle()
602            .create_session(steer_core::test_utils::read_only_session_config(
603                steer_core::config::model::builtin::claude_sonnet_4_5(),
604            ))
605            .await
606            .expect("create session");
607
608        let mut event_client = AgentServiceClient::new(setup.channel.clone());
609        let mut action_client = AgentServiceClient::new(setup.channel.clone());
610
611        let request = tonic::Request::new(SubscribeSessionEventsRequest {
612            session_id: session_id.to_string(),
613            since_sequence: None,
614        });
615
616        let mut stream = event_client
617            .subscribe_session_events(request)
618            .await
619            .expect("subscribe")
620            .into_inner();
621
622        let model_spec = crate::grpc::conversions::model_to_proto(model.clone());
623
624        let first_response = action_client
625            .send_message(tonic::Request::new(SendMessageRequest {
626                session_id: session_id.to_string(),
627                message: "first".to_string(),
628                content: Vec::new(),
629                model: Some(model_spec.clone()),
630            }))
631            .await
632            .expect("send_message");
633        let first_op = first_response.into_inner().operation.expect("operation");
634        wait_for_processing_completed(&mut stream, &first_op.id).await;
635
636        let second_response = action_client
637            .send_message(tonic::Request::new(SendMessageRequest {
638                session_id: session_id.to_string(),
639                message: "second".to_string(),
640                content: Vec::new(),
641                model: Some(model_spec.clone()),
642            }))
643            .await
644            .expect("send_message");
645        let second_op = second_response.into_inner().operation.expect("operation");
646        wait_for_processing_completed(&mut stream, &second_op.id).await;
647
648        action_client
649            .compact_session(tonic::Request::new(CompactSessionRequest {
650                session_id: session_id.to_string(),
651                model: Some(model_spec),
652            }))
653            .await
654            .expect("compact_session");
655
656        let mut compact_summary = None;
657        let mut compaction_record = None;
658
659        while compact_summary.is_none() || compaction_record.is_none() {
660            let event = next_event(&mut stream).await;
661            match event.event {
662                Some(steer_proto::agent::v1::session_event::Event::CompactResult(e)) => {
663                    let result = e.result.expect("compact result");
664                    match result.result {
665                        Some(steer_proto::agent::v1::compact_result::Result::Success(success)) => {
666                            compact_summary = Some(success.summary);
667                        }
668                        other => panic!("unexpected compact result: {other:?}"),
669                    }
670                }
671                Some(steer_proto::agent::v1::session_event::Event::ConversationCompacted(e)) => {
672                    compaction_record = Some(e.record.expect("compaction record"));
673                }
674                _ => {}
675            }
676        }
677
678        assert_eq!(compact_summary.expect("summary"), STUB_RESPONSE);
679        let record = compaction_record.expect("record");
680        assert!(!record.id.is_empty());
681        assert_eq!(record.model, model.id);
682    }
683
684    #[tokio::test]
685    async fn test_compaction_emits_processing_completed_with_sqlite_store() {
686        let workspace_root = test_workspace_root();
687        let db_path = workspace_root.path().join("sessions.db");
688        let model = steer_core::config::model::builtin::claude_sonnet_4_5();
689        let setup = setup_local_grpc_with_stub_provider_and_store(
690            model.clone(),
691            workspace_root.path().to_path_buf(),
692            Some(db_path.clone()),
693        )
694        .await
695        .expect("local grpc setup");
696
697        let session_id = setup
698            .runtime_service
699            .handle()
700            .create_session(steer_core::test_utils::read_only_session_config(
701                steer_core::config::model::builtin::claude_sonnet_4_5(),
702            ))
703            .await
704            .expect("create session");
705
706        let mut event_client = AgentServiceClient::new(setup.channel.clone());
707        let mut action_client = AgentServiceClient::new(setup.channel.clone());
708
709        let request = tonic::Request::new(SubscribeSessionEventsRequest {
710            session_id: session_id.to_string(),
711            since_sequence: None,
712        });
713
714        let mut stream = event_client
715            .subscribe_session_events(request)
716            .await
717            .expect("subscribe")
718            .into_inner();
719
720        let sqlite_store = SqliteEventStore::new(&db_path)
721            .await
722            .expect("open sqlite store");
723        let before_len = sqlite_store
724            .load_events(session_id)
725            .await
726            .expect("load events")
727            .len();
728
729        let model_spec = crate::grpc::conversions::model_to_proto(model.clone());
730
731        let first_response = action_client
732            .send_message(tonic::Request::new(SendMessageRequest {
733                session_id: session_id.to_string(),
734                message: "first".to_string(),
735                content: Vec::new(),
736                model: Some(model_spec.clone()),
737            }))
738            .await
739            .expect("send_message");
740        let first_op = first_response.into_inner().operation.expect("operation");
741        wait_for_processing_completed(&mut stream, &first_op.id).await;
742
743        let second_response = action_client
744            .send_message(tonic::Request::new(SendMessageRequest {
745                session_id: session_id.to_string(),
746                message: "second".to_string(),
747                content: Vec::new(),
748                model: Some(model_spec.clone()),
749            }))
750            .await
751            .expect("send_message");
752        let second_op = second_response.into_inner().operation.expect("operation");
753        wait_for_processing_completed(&mut stream, &second_op.id).await;
754
755        action_client
756            .compact_session(tonic::Request::new(CompactSessionRequest {
757                session_id: session_id.to_string(),
758                model: Some(model_spec),
759            }))
760            .await
761            .expect("compact_session");
762
763        let mut seen = Vec::new();
764        let mut compaction_op_id = None;
765        let mut completed = false;
766        let deadline = Instant::now() + Duration::from_secs(5);
767
768        while Instant::now() < deadline {
769            let remaining = deadline.saturating_duration_since(Instant::now());
770            let next = timeout(remaining, stream.message()).await;
771            let Ok(Ok(Some(event))) = next else { break };
772
773            seen.push(proto_event_kind(&event));
774
775            if let Some(steer_proto::agent::v1::session_event::Event::ProcessingStarted(ref e)) =
776                event.event
777            {
778                compaction_op_id = Some(e.op_id.clone());
779            }
780
781            if let Some(steer_proto::agent::v1::session_event::Event::ProcessingCompleted(e)) =
782                event.event
783            {
784                if compaction_op_id.as_deref() == Some(e.op_id.as_str()) {
785                    completed = true;
786                    break;
787                }
788            }
789        }
790
791        if !completed {
792            let persisted = sqlite_store
793                .load_events(session_id)
794                .await
795                .expect("load events");
796            let persisted_kinds: Vec<&'static str> = persisted
797                .iter()
798                .map(|(_, event)| core_event_kind(event))
799                .collect();
800
801            let after_len = persisted.len();
802            let new_events = persisted_kinds
803                .iter()
804                .skip(before_len)
805                .copied()
806                .collect::<Vec<_>>();
807
808            panic!(
809                "missing ProcessingCompleted for compaction; seen={seen:?} \
810persisted_new={new_events:?} (before_len={before_len}, after_len={after_len})"
811            );
812        }
813    }
814
815    #[tokio::test]
816    async fn test_send_message_uses_session_default_model_when_not_specified() {
817        let workspace_root = test_workspace_root();
818        let setup = setup_local_grpc_with_catalog(
819            steer_core::config::model::builtin::claude_sonnet_4_5(),
820            None,
821            CatalogConfig::default(),
822            Some(workspace_root.path().to_path_buf()),
823        )
824        .await
825        .expect("local grpc setup");
826
827        let session_id = setup
828            .runtime_service
829            .handle()
830            .create_session(steer_core::test_utils::read_only_session_config(
831                steer_core::config::model::builtin::claude_sonnet_4_5(),
832            ))
833            .await
834            .expect("create session");
835
836        let mut client = AgentServiceClient::new(setup.channel.clone());
837        let request = tonic::Request::new(SendMessageRequest {
838            session_id: session_id.to_string(),
839            message: "hello".to_string(),
840            content: Vec::new(),
841            model: None,
842        });
843
844        let response = client
845            .send_message(request)
846            .await
847            .expect("send_message should succeed using session default model")
848            .into_inner();
849
850        assert!(
851            response.operation.is_some(),
852            "Response should contain an operation"
853        );
854    }
855
856    #[tokio::test]
857    async fn test_compact_session_requires_model_spec() {
858        let workspace_root = test_workspace_root();
859        let setup = setup_local_grpc_with_catalog(
860            steer_core::config::model::builtin::claude_sonnet_4_5(),
861            None,
862            CatalogConfig::default(),
863            Some(workspace_root.path().to_path_buf()),
864        )
865        .await
866        .expect("local grpc setup");
867
868        let session_id = setup
869            .runtime_service
870            .handle()
871            .create_session(steer_core::test_utils::read_only_session_config(
872                steer_core::config::model::builtin::claude_sonnet_4_5(),
873            ))
874            .await
875            .expect("create session");
876
877        let mut client = AgentServiceClient::new(setup.channel.clone());
878        let request = tonic::Request::new(CompactSessionRequest {
879            session_id: session_id.to_string(),
880            model: None,
881        });
882
883        let err = client
884            .compact_session(request)
885            .await
886            .expect_err("compact_session should fail without model");
887        assert_eq!(err.code(), Code::InvalidArgument);
888    }
889
890    #[tokio::test]
891    async fn test_list_models_sets_context_window_tokens_field() {
892        let workspace_root = test_workspace_root();
893        let setup = setup_local_grpc_with_catalog(
894            steer_core::config::model::builtin::claude_sonnet_4_5(),
895            None,
896            CatalogConfig::default(),
897            Some(workspace_root.path().to_path_buf()),
898        )
899        .await
900        .expect("local grpc setup");
901
902        let mut client = AgentServiceClient::new(setup.channel.clone());
903        let response = client
904            .list_models(tonic::Request::new(ListModelsRequest { provider_id: None }))
905            .await
906            .expect("list_models should succeed")
907            .into_inner();
908
909        assert!(
910            !response.models.is_empty(),
911            "Expected at least one recommended model"
912        );
913        assert!(
914            response
915                .models
916                .iter()
917                .all(|model| model.context_window_tokens.is_some()),
918            "expected context_window_tokens to be populated for all listed models"
919        );
920    }
921
922    #[tokio::test]
923    async fn test_execute_bash_command_does_not_require_model_spec() {
924        let workspace_root = test_workspace_root();
925        let setup = setup_local_grpc_with_catalog(
926            steer_core::config::model::builtin::claude_sonnet_4_5(),
927            None,
928            CatalogConfig::default(),
929            Some(workspace_root.path().to_path_buf()),
930        )
931        .await
932        .expect("local grpc setup");
933
934        let session_id = setup
935            .runtime_service
936            .handle()
937            .create_session(steer_core::test_utils::read_only_session_config(
938                steer_core::config::model::builtin::claude_sonnet_4_5(),
939            ))
940            .await
941            .expect("create session");
942
943        let mut client = AgentServiceClient::new(setup.channel.clone());
944        let request = tonic::Request::new(ExecuteBashCommandRequest {
945            session_id: session_id.to_string(),
946            command: "echo hi".to_string(),
947        });
948
949        client
950            .execute_bash_command(request)
951            .await
952            .expect("execute_bash_command should succeed without model");
953    }
954}