Skip to main content

steer_grpc/grpc/
runtime_server.rs

1use crate::grpc::conversions::{
2    environment_descriptor_to_proto, message_to_proto, model_to_proto, proto_to_model,
3    proto_to_session_policy_overrides, proto_to_tool_config, proto_to_workspace_config,
4    repo_info_to_proto, session_event_to_proto, stream_delta_to_proto, workspace_info_to_proto,
5    workspace_status_to_proto,
6};
7use std::cmp::Ordering as CmpOrdering;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::{Duration, Instant};
12use steer_core::app::conversation::UserContent;
13use steer_core::app::domain::runtime::{RuntimeError, RuntimeHandle};
14use steer_core::app::domain::session::{SessionCatalog, SessionFilter};
15use steer_core::app::domain::types::SessionId;
16use steer_core::auth::api_key::ApiKeyAuthFlow;
17use steer_core::auth::{
18    AuthFlowWrapper, AuthMethod, AuthSource, DynAuthenticationFlow, ModelId as AuthModelId,
19    ModelVisibilityPolicy, ProviderId as AuthProviderId,
20};
21use steer_core::session::state::SessionConfig;
22use steer_proto::agent::v1::{
23    self as proto, ApproveToolRequest, ApproveToolResponse, CancelOperationRequest,
24    CancelOperationResponse, CompactSessionRequest, CompactSessionResponse, CreateSessionRequest,
25    CreateSessionResponse, DeleteSessionRequest, DeleteSessionResponse, DequeueQueuedItemRequest,
26    DequeueQueuedItemResponse, EditMessageRequest, EditMessageResponse, ExecuteBashCommandRequest,
27    ExecuteBashCommandResponse, GetConversationFooter, GetConversationRequest,
28    GetConversationResponse, GetMcpServersRequest, GetMcpServersResponse, GetSessionRequest,
29    GetSessionResponse, ListFilesRequest, ListFilesResponse, ListModelsRequest, ListModelsResponse,
30    ListProvidersRequest, ListProvidersResponse, ListSessionsRequest, ListSessionsResponse,
31    Operation, OperationStatus, OperationType, SendMessageRequest, SendMessageResponse,
32    SessionEvent, SessionInfo, SessionStateFooter, SessionStateHeader,
33    SubscribeSessionEventsRequest, SwitchPrimaryAgentRequest, SwitchPrimaryAgentResponse,
34    agent_service_server, get_conversation_response, get_session_response,
35};
36use steer_workspace::{EnvironmentManager, RepoManager, WorkspaceManager};
37use tokio::sync::{Mutex, broadcast, mpsc};
38use tokio_stream::wrappers::ReceiverStream;
39use tonic::{Request, Response, Status};
40use tracing::{debug, error, info, warn};
41use uuid::Uuid;
42
43pub struct RuntimeAgentService {
44    runtime: RuntimeHandle,
45    catalog: Arc<dyn SessionCatalog>,
46    model_registry: Arc<steer_core::model_registry::ModelRegistry>,
47    provider_registry: Arc<steer_core::auth::ProviderRegistry>,
48    llm_config_provider: steer_core::config::LlmConfigProvider,
49    environment_manager: Arc<dyn EnvironmentManager>,
50    workspace_manager: Arc<dyn WorkspaceManager>,
51    repo_manager: Arc<dyn RepoManager>,
52    auth_flow_manager: Arc<AuthFlowManager>,
53}
54
55const AUTH_FLOW_TTL: Duration = Duration::from_secs(10 * 60);
56
57struct AuthFlowEntry {
58    flow: Arc<dyn DynAuthenticationFlow>,
59    state: Box<dyn std::any::Any + Send + Sync>,
60    last_updated: Instant,
61}
62
63#[derive(Default)]
64struct AuthFlowManager {
65    flows: Mutex<HashMap<String, AuthFlowEntry>>,
66}
67
68impl AuthFlowManager {
69    fn new() -> Self {
70        Self::default()
71    }
72
73    async fn insert(&self, flow_id: String, entry: AuthFlowEntry) {
74        let mut flows = self.flows.lock().await;
75        flows.insert(flow_id, entry);
76    }
77
78    async fn take(&self, flow_id: &str) -> Option<AuthFlowEntry> {
79        let mut flows = self.flows.lock().await;
80        flows.remove(flow_id)
81    }
82
83    async fn cleanup(&self) {
84        let mut flows = self.flows.lock().await;
85        flows.retain(|_, entry| entry.last_updated.elapsed() <= AUTH_FLOW_TTL);
86    }
87}
88
89pub struct RuntimeAgentDeps {
90    pub runtime: RuntimeHandle,
91    pub catalog: Arc<dyn SessionCatalog>,
92    pub llm_config_provider: steer_core::config::LlmConfigProvider,
93    pub model_registry: Arc<steer_core::model_registry::ModelRegistry>,
94    pub provider_registry: Arc<steer_core::auth::ProviderRegistry>,
95    pub environment_manager: Arc<dyn EnvironmentManager>,
96    pub workspace_manager: Arc<dyn WorkspaceManager>,
97    pub repo_manager: Arc<dyn RepoManager>,
98}
99
100impl RuntimeAgentService {
101    pub fn new(deps: RuntimeAgentDeps) -> Self {
102        Self {
103            runtime: deps.runtime,
104            catalog: deps.catalog,
105            llm_config_provider: deps.llm_config_provider,
106            model_registry: deps.model_registry,
107            provider_registry: deps.provider_registry,
108            environment_manager: deps.environment_manager,
109            workspace_manager: deps.workspace_manager,
110            repo_manager: deps.repo_manager,
111            auth_flow_manager: Arc::new(AuthFlowManager::new()),
112        }
113    }
114
115    #[expect(clippy::result_large_err)]
116    fn parse_session_id(session_id: &str) -> Result<SessionId, Status> {
117        Uuid::parse_str(session_id)
118            .map(SessionId::from)
119            .map_err(|_| Status::invalid_argument(format!("Invalid session ID: {session_id}")))
120    }
121    fn parse_environment_id(
122        environment_id: &str,
123    ) -> Result<steer_workspace::EnvironmentId, Status> {
124        if environment_id.is_empty() {
125            return Ok(steer_workspace::EnvironmentId::local());
126        }
127        let id = Uuid::parse_str(environment_id).map_err(|_| {
128            Status::invalid_argument(format!("Invalid environment ID: {environment_id}"))
129        })?;
130        Ok(steer_workspace::EnvironmentId::from_uuid(id))
131    }
132
133    fn parse_workspace_id(workspace_id: &str) -> Result<steer_workspace::WorkspaceId, Status> {
134        let id = Uuid::parse_str(workspace_id).map_err(|_| {
135            Status::invalid_argument(format!("Invalid workspace ID: {workspace_id}"))
136        })?;
137        Ok(steer_workspace::WorkspaceId::from_uuid(id))
138    }
139
140    fn parse_repo_id(repo_id: &str) -> Result<steer_workspace::RepoId, Status> {
141        let id = Uuid::parse_str(repo_id)
142            .map_err(|_| Status::invalid_argument(format!("Invalid repo ID: {repo_id}")))?;
143        Ok(steer_workspace::RepoId::from_uuid(id))
144    }
145
146    fn select_default_model(&self) -> steer_core::config::model::ModelId {
147        let builtin_default = steer_core::config::model::builtin::default_model();
148
149        if let Some(config) = self.model_registry.get(&builtin_default)
150            && config.recommended
151        {
152            return builtin_default;
153        }
154
155        let mut recommended: Vec<_> = self.model_registry.recommended().collect();
156        if recommended.is_empty() {
157            return builtin_default;
158        }
159
160        recommended.sort_by(|a, b| {
161            let provider_cmp = a.provider.as_str().cmp(b.provider.as_str());
162            if provider_cmp == CmpOrdering::Equal {
163                a.id.cmp(&b.id)
164            } else {
165                provider_cmp
166            }
167        });
168
169        let chosen = recommended[0];
170        steer_core::config::model::ModelId::new(chosen.provider.clone(), chosen.id.clone())
171    }
172
173    fn workspace_manager_error_to_status(err: steer_workspace::WorkspaceManagerError) -> Status {
174        match err {
175            steer_workspace::WorkspaceManagerError::NotFound(msg) => Status::not_found(msg),
176            steer_workspace::WorkspaceManagerError::NotSupported(msg) => {
177                Status::failed_precondition(msg)
178            }
179            steer_workspace::WorkspaceManagerError::InvalidRequest(msg) => {
180                Status::invalid_argument(msg)
181            }
182            steer_workspace::WorkspaceManagerError::Io(msg)
183            | steer_workspace::WorkspaceManagerError::Other(msg) => Status::internal(msg),
184        }
185    }
186
187    fn environment_manager_error_to_status(
188        err: steer_workspace::EnvironmentManagerError,
189    ) -> Status {
190        match err {
191            steer_workspace::EnvironmentManagerError::NotFound(msg) => Status::not_found(msg),
192            steer_workspace::EnvironmentManagerError::NotSupported(msg) => {
193                Status::failed_precondition(msg)
194            }
195            steer_workspace::EnvironmentManagerError::InvalidRequest(msg) => {
196                Status::invalid_argument(msg)
197            }
198            steer_workspace::EnvironmentManagerError::Io(msg)
199            | steer_workspace::EnvironmentManagerError::Other(msg) => Status::internal(msg),
200        }
201    }
202
203    fn create_auth_flow(
204        &self,
205        provider_id: &steer_core::config::provider::ProviderId,
206    ) -> Result<(Arc<dyn DynAuthenticationFlow>, AuthMethod), Status> {
207        let provider_cfg = self.provider_registry.get(provider_id).ok_or_else(|| {
208            Status::not_found(format!("Unknown provider: {}", provider_id.as_str()))
209        })?;
210        let provider_name = provider_cfg.name.clone();
211        let auth_storage = self.llm_config_provider.auth_storage().clone();
212
213        if let Some(plugin) = self.llm_config_provider.plugin_registry().get(provider_id)
214            && let Some(flow) = plugin.create_flow(auth_storage.clone())
215        {
216            let methods = flow.available_methods();
217            let method = if methods.contains(&AuthMethod::OAuth) {
218                AuthMethod::OAuth
219            } else if methods.contains(&AuthMethod::ApiKey) {
220                AuthMethod::ApiKey
221            } else {
222                return Err(Status::failed_precondition(format!(
223                    "No supported auth methods for provider {}",
224                    provider_id.as_str()
225                )));
226            };
227            return Ok((Arc::from(flow), method));
228        }
229
230        let flow = AuthFlowWrapper::new(ApiKeyAuthFlow::new(
231            auth_storage,
232            provider_id.clone(),
233            provider_name,
234        ));
235        Ok((Arc::new(flow), AuthMethod::ApiKey))
236    }
237}
238
239#[tonic::async_trait]
240impl agent_service_server::AgentService for RuntimeAgentService {
241    type SubscribeSessionEventsStream = ReceiverStream<Result<SessionEvent, Status>>;
242    type ListFilesStream = ReceiverStream<Result<ListFilesResponse, Status>>;
243    type GetSessionStream =
244        std::pin::Pin<Box<dyn futures::Stream<Item = Result<GetSessionResponse, Status>> + Send>>;
245    type GetConversationStream = std::pin::Pin<
246        Box<dyn futures::Stream<Item = Result<GetConversationResponse, Status>> + Send>,
247    >;
248
249    async fn subscribe_session_events(
250        &self,
251        request: Request<SubscribeSessionEventsRequest>,
252    ) -> Result<Response<Self::SubscribeSessionEventsStream>, Status> {
253        let req = request.into_inner();
254        let session_id = Self::parse_session_id(&req.session_id)?;
255
256        if let Err(e) = self.runtime.resume_session(session_id).await
257            && !matches!(e, RuntimeError::SessionNotFound { .. })
258        {
259            error!("Failed to resume session {}: {}", session_id, e);
260        }
261
262        let subscription = self
263            .runtime
264            .subscribe_events(session_id)
265            .await
266            .map_err(|e| Status::internal(format!("Failed to subscribe: {e}")))?;
267
268        let delta_subscription = self
269            .runtime
270            .subscribe_deltas(session_id)
271            .await
272            .map_err(|e| Status::internal(format!("Failed to subscribe to deltas: {e}")))?;
273
274        let (tx, rx) = mpsc::channel(100);
275        let last_sequence = Arc::new(AtomicU64::new(req.since_sequence.unwrap_or(0)));
276        let delta_sequence = Arc::new(AtomicU64::new(0));
277
278        let mut min_live_seq = req.since_sequence.map(|seq| seq.saturating_add(1));
279
280        if let Some(after_seq) = req.since_sequence {
281            match self.runtime.load_events_after(session_id, after_seq).await {
282                Ok(events) => {
283                    let mut last_seq = after_seq;
284                    for (seq, event) in events {
285                        last_seq = last_seq.max(seq);
286                        let proto_event = match session_event_to_proto(event, seq) {
287                            Ok(event) => event,
288                            Err(e) => {
289                                warn!("Failed to convert session replay event: {}", e);
290                                continue;
291                            }
292                        };
293
294                        if proto_event.event.is_none() {
295                            continue;
296                        }
297
298                        if let Err(e) = tx.send(Ok(proto_event)).await {
299                            warn!("Failed to send replay event to client: {}", e);
300                            break;
301                        }
302                    }
303                    min_live_seq = Some(last_seq.saturating_add(1));
304                    last_sequence.store(last_seq, Ordering::Relaxed);
305                }
306                Err(e) => {
307                    warn!("Failed to load replay events: {}", e);
308                }
309            }
310        }
311
312        let event_tx = tx.clone();
313        let last_sequence_events = last_sequence.clone();
314        let delta_sequence_counter = delta_sequence.clone();
315        let min_live_seq = min_live_seq;
316        tokio::spawn(async move {
317            async fn send_delta(
318                delta: steer_core::app::domain::delta::StreamDelta,
319                tx: &mpsc::Sender<Result<proto::SessionEvent, Status>>,
320                last_sequence: &Arc<AtomicU64>,
321                delta_sequence: &Arc<AtomicU64>,
322            ) -> Result<(), ()> {
323                let sequence_num = last_sequence.load(Ordering::Relaxed);
324                let delta_sequence = delta_sequence.fetch_add(1, Ordering::Relaxed);
325                let proto_event = match stream_delta_to_proto(delta, sequence_num, delta_sequence) {
326                    Ok(event) => event,
327                    Err(e) => {
328                        warn!("Failed to convert stream delta: {}", e);
329                        return Ok(());
330                    }
331                };
332
333                if let Err(e) = tx.send(Ok(proto_event)).await {
334                    warn!("Failed to send delta to client: {}", e);
335                    return Err(());
336                }
337
338                Ok(())
339            }
340
341            let mut subscription = subscription;
342            let mut delta_rx = delta_subscription;
343            let mut events_closed = false;
344            let mut deltas_closed = false;
345
346            loop {
347                if events_closed && deltas_closed {
348                    break;
349                }
350
351                tokio::select! {
352                    envelope = subscription.recv(), if !events_closed => {
353                        match envelope {
354                            Some(envelope) => {
355                                loop {
356                                    match delta_rx.try_recv() {
357                                        Ok(delta) => {
358                                            if send_delta(
359                                                delta,
360                                                &event_tx,
361                                                &last_sequence_events,
362                                                &delta_sequence_counter,
363                                            )
364                                            .await
365                                            .is_err()
366                                            {
367                                                return;
368                                            }
369                                        }
370                                        Err(broadcast::error::TryRecvError::Empty) => break,
371                                        Err(broadcast::error::TryRecvError::Lagged(skipped)) => {
372                                            warn!("Delta subscription lagged by {} messages", skipped);
373                                        }
374                                        Err(broadcast::error::TryRecvError::Closed) => {
375                                            deltas_closed = true;
376                                            break;
377                                        }
378                                    }
379                                }
380
381                                if let Some(min_seq) = min_live_seq
382                                    && envelope.seq < min_seq {
383                                        continue;
384                                    }
385
386                                let proto_event = match session_event_to_proto(envelope.event, envelope.seq) {
387                                    Ok(event) => event,
388                                    Err(e) => {
389                                        warn!("Failed to convert session event: {}", e);
390                                        continue;
391                                    }
392                                };
393
394                                if proto_event.event.is_none() {
395                                    continue;
396                                }
397
398                                if let Err(e) = event_tx.send(Ok(proto_event)).await {
399                                    warn!("Failed to send event to client: {}", e);
400                                    break;
401                                }
402                                last_sequence_events.store(envelope.seq, Ordering::Relaxed);
403                            }
404                            None => {
405                                events_closed = true;
406                            }
407                        }
408                    }
409                    delta = delta_rx.recv(), if !deltas_closed => {
410                        match delta {
411                            Ok(delta) => {
412                                if send_delta(
413                                    delta,
414                                    &event_tx,
415                                    &last_sequence_events,
416                                    &delta_sequence_counter,
417                                )
418                                .await
419                                .is_err()
420                                {
421                                    break;
422                                }
423                            }
424                            Err(broadcast::error::RecvError::Lagged(skipped)) => {
425                                warn!("Delta subscription lagged by {} messages", skipped);
426                            }
427                            Err(broadcast::error::RecvError::Closed) => {
428                                deltas_closed = true;
429                            }
430                        }
431                    }
432                }
433            }
434            debug!("Event forwarding task ended for session: {}", session_id);
435        });
436
437        Ok(Response::new(ReceiverStream::new(rx)))
438    }
439
440    async fn create_session(
441        &self,
442        request: Request<CreateSessionRequest>,
443    ) -> Result<Response<CreateSessionResponse>, Status> {
444        let req = request.into_inner();
445
446        let default_model_spec = req
447            .default_model
448            .ok_or_else(|| Status::invalid_argument("Missing required default_model"))?;
449        let default_model = proto_to_model(&default_model_spec)
450            .map_err(|e| Status::invalid_argument(format!("Invalid default_model: {e}")))?;
451
452        let tool_config = req
453            .tool_config
454            .map(proto_to_tool_config)
455            .unwrap_or_default();
456
457        let workspace_config = req
458            .workspace_config
459            .map(proto_to_workspace_config)
460            .unwrap_or_default();
461
462        let policy_overrides = proto_to_session_policy_overrides(req.policy_overrides);
463
464        let mut workspace_id = None;
465        let mut workspace_ref = None;
466        let mut repo_ref = None;
467        let parent_session_id = None;
468        let mut workspace_name = None;
469
470        if repo_ref.is_none()
471            && let steer_core::session::state::WorkspaceConfig::Local { path } = &workspace_config
472        {
473            match self
474                .repo_manager
475                .resolve_repo(steer_workspace::EnvironmentId::local(), path)
476                .await
477            {
478                Ok(repo_info) => {
479                    repo_ref = Some(steer_workspace::RepoRef {
480                        environment_id: repo_info.environment_id,
481                        repo_id: repo_info.repo_id,
482                        root_path: repo_info.root_path.clone(),
483                        vcs_kind: repo_info.vcs_kind,
484                    });
485                    workspace_name = repo_info
486                        .root_path
487                        .file_name()
488                        .map(|n| n.to_string_lossy().into_owned());
489                }
490                Err(err) => {
491                    warn!("Failed to resolve repo for session: {}", err);
492                }
493            }
494        }
495
496        if workspace_id.is_none()
497            && workspace_ref.is_none()
498            && let steer_core::session::state::WorkspaceConfig::Local { path } = &workspace_config
499            && let Ok(info) = self.workspace_manager.resolve_workspace(path).await
500        {
501            workspace_id = Some(info.workspace_id);
502            workspace_ref = Some(steer_workspace::WorkspaceRef {
503                environment_id: info.environment_id,
504                workspace_id: info.workspace_id,
505                repo_id: info.repo_id,
506            });
507            workspace_name.clone_from(&info.name);
508        }
509
510        let session_config = SessionConfig {
511            workspace: workspace_config,
512            workspace_ref,
513            workspace_id,
514            repo_ref,
515            parent_session_id,
516            workspace_name,
517            tool_config,
518            system_prompt: None,
519            primary_agent_id: req.primary_agent_id,
520            policy_overrides,
521            metadata: req.metadata,
522            default_model,
523            auto_compaction: req
524                .auto_compaction
525                .map(|ac| steer_core::session::state::AutoCompactionConfig {
526                    enabled: ac.enabled,
527                    threshold_percent: ac.threshold_percent,
528                })
529                .unwrap_or_default(),
530        };
531
532        match self.runtime.create_session(session_config.clone()).await {
533            Ok(session_id) => {
534                if let Err(e) = self
535                    .catalog
536                    .update_session_catalog(session_id, Some(&session_config), false, None)
537                    .await
538                {
539                    error!("Failed to update session catalog: {}", e);
540                    return Err(Status::internal(format!(
541                        "Failed to update session catalog: {e}"
542                    )));
543                }
544
545                let session_info = SessionInfo {
546                    id: session_id.to_string(),
547                    created_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
548                    updated_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
549                    status: proto::SessionStatus::Active as i32,
550                    metadata: None,
551                };
552                Ok(Response::new(CreateSessionResponse {
553                    session: Some(session_info),
554                }))
555            }
556            Err(e) => {
557                error!("Failed to create session: {}", e);
558                Err(Status::internal(format!("Failed to create session: {e}")))
559            }
560        }
561    }
562
563    async fn list_sessions(
564        &self,
565        _request: Request<ListSessionsRequest>,
566    ) -> Result<Response<ListSessionsResponse>, Status> {
567        let filter = SessionFilter::default();
568
569        match self.catalog.list_sessions(filter).await {
570            Ok(sessions) => {
571                let proto_sessions = sessions
572                    .into_iter()
573                    .map(|s| SessionInfo {
574                        id: s.id.to_string(),
575                        created_at: Some(prost_types::Timestamp::from(
576                            std::time::SystemTime::from(s.created_at),
577                        )),
578                        updated_at: Some(prost_types::Timestamp::from(
579                            std::time::SystemTime::from(s.updated_at),
580                        )),
581                        status: proto::SessionStatus::Active as i32,
582                        metadata: None,
583                    })
584                    .collect();
585
586                Ok(Response::new(ListSessionsResponse {
587                    sessions: proto_sessions,
588                    next_page_token: None,
589                }))
590            }
591            Err(e) => {
592                error!("Failed to list sessions: {}", e);
593                Err(Status::internal(format!("Failed to list sessions: {e}")))
594            }
595        }
596    }
597
598    async fn get_session(
599        &self,
600        request: Request<GetSessionRequest>,
601    ) -> Result<Response<Self::GetSessionStream>, Status> {
602        let req = request.into_inner();
603        let session_id = Self::parse_session_id(&req.session_id)?;
604        let runtime = self.runtime.clone();
605        let catalog = self.catalog.clone();
606
607        let stream = async_stream::try_stream! {
608            if let Err(e) = runtime.resume_session(session_id).await
609                && matches!(e, RuntimeError::SessionNotFound { .. }) {
610                    Err(Status::not_found(format!("Session not found: {session_id}")))?;
611                    return;
612                }
613
614            let state = runtime.get_session_state(session_id).await
615                .map_err(|e| Status::internal(format!("Failed to get session state: {e}")))?;
616
617            let config = catalog.get_session_config(session_id).await
618                .map_err(|e| Status::internal(format!("Failed to get session config: {e}")))?;
619
620            yield GetSessionResponse {
621                chunk: Some(get_session_response::Chunk::Header(SessionStateHeader {
622                    id: session_id.to_string(),
623                    created_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
624                    updated_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
625                    config: config.map(|c| crate::grpc::conversions::session_config_to_proto(&c)),
626                    last_event_sequence: state.event_sequence,
627                })),
628            };
629
630            for message in state.message_graph.messages {
631                let proto_msg = message_to_proto(message)
632                    .map_err(|e| Status::internal(format!("Failed to convert message: {e}")))?;
633                yield GetSessionResponse {
634                    chunk: Some(get_session_response::Chunk::Message(proto_msg)),
635                };
636            }
637
638            yield GetSessionResponse {
639                chunk: Some(get_session_response::Chunk::Footer(SessionStateFooter {
640                    approved_tools: state.approved_tools.into_iter().collect(),
641                    metadata: std::collections::HashMap::new(),
642                    queued_head: state
643                        .queued_work
644                        .front()
645                        .map(|item| match item {
646                            steer_core::app::domain::state::QueuedWorkItem::UserMessage(message) => {
647                                proto::QueuedWorkItem {
648                                    kind: proto::queued_work_item::Kind::UserMessage as i32,
649                                    content: message
650                                        .content
651                                        .iter()
652                                        .filter_map(|item| match item {
653                                            UserContent::Text { text } => Some(text.as_str()),
654                                            _ => None,
655                                        })
656                                        .collect::<Vec<_>>()
657                                        .join("\n"),
658                                    model: Some(model_to_proto(message.model.clone())),
659                                    queued_at: message.queued_at,
660                                    op_id: message.op_id.to_string(),
661                                    message_id: message.message_id.to_string(),
662                                    attachment_count: message
663                                        .content
664                                        .iter()
665                                        .filter(|item| matches!(item, UserContent::Image { .. }))
666                                        .count() as u32,
667                                }
668                            }
669                            steer_core::app::domain::state::QueuedWorkItem::DirectBash(command) => {
670                                proto::QueuedWorkItem {
671                                    kind: proto::queued_work_item::Kind::DirectBash as i32,
672                                    content: command.command.clone(),
673                                    model: None,
674                                    queued_at: command.queued_at,
675                                    op_id: command.op_id.to_string(),
676                                    message_id: command.message_id.to_string(),
677                                    attachment_count: 0,
678                                }
679                            }
680                        }),
681                    queued_count: state.queued_work.len() as u32,
682                })),
683            };
684        };
685
686        Ok(Response::new(Box::pin(stream)))
687    }
688
689    async fn delete_session(
690        &self,
691        request: Request<DeleteSessionRequest>,
692    ) -> Result<Response<DeleteSessionResponse>, Status> {
693        let req = request.into_inner();
694        let session_id = Self::parse_session_id(&req.session_id)?;
695
696        match self.runtime.delete_session(session_id).await {
697            Ok(()) => Ok(Response::new(DeleteSessionResponse {})),
698            Err(RuntimeError::SessionNotFound { .. }) => Err(Status::not_found(format!(
699                "Session not found: {}",
700                req.session_id
701            ))),
702            Err(e) => {
703                error!("Failed to delete session: {}", e);
704                Err(Status::internal(format!("Failed to delete session: {e}")))
705            }
706        }
707    }
708
709    async fn get_conversation(
710        &self,
711        request: Request<GetConversationRequest>,
712    ) -> Result<Response<Self::GetConversationStream>, Status> {
713        let req = request.into_inner();
714        let session_id = Self::parse_session_id(&req.session_id)?;
715        let runtime = self.runtime.clone();
716
717        info!("GetConversation called for session: {}", session_id);
718
719        let stream = async_stream::try_stream! {
720            if let Err(e) = runtime.resume_session(session_id).await
721                && matches!(e, RuntimeError::SessionNotFound { .. }) {
722                    Err(Status::not_found(format!("Session not found: {session_id}")))?;
723                    return;
724                }
725
726            let state = runtime.get_session_state(session_id).await
727                .map_err(|e| Status::internal(format!("Failed to get session state: {e}")))?;
728
729            info!(
730                "Found session state with {} messages and {} approved tools",
731                state.message_graph.messages.len(),
732                state.approved_tools.len()
733            );
734
735            for msg in state.message_graph.messages {
736                let proto_msg = message_to_proto(msg)
737                    .map_err(|e| Status::internal(format!("Failed to convert message: {e}")))?;
738                yield GetConversationResponse {
739                    chunk: Some(get_conversation_response::Chunk::Message(proto_msg)),
740                };
741            }
742
743            yield GetConversationResponse {
744                chunk: Some(get_conversation_response::Chunk::Footer(GetConversationFooter {
745                    approved_tools: state.approved_tools.into_iter().collect(),
746                    compaction_summary_ids: state.compaction_summary_ids.into_iter().collect(),
747                })),
748            };
749        };
750
751        Ok(Response::new(Box::pin(stream)))
752    }
753
754    async fn send_message(
755        &self,
756        request: Request<SendMessageRequest>,
757    ) -> Result<Response<SendMessageResponse>, Status> {
758        let req = request.into_inner();
759        let session_id = Self::parse_session_id(&req.session_id)?;
760
761        let model = if let Some(model_spec) = req.model {
762            proto_to_model(&model_spec)
763                .map_err(|e| Status::invalid_argument(format!("Invalid model spec: {e}")))?
764        } else {
765            let config = self
766                .catalog
767                .get_session_config(session_id)
768                .await
769                .map_err(|e| Status::internal(format!("Failed to get session config: {e}")))?
770                .ok_or_else(|| Status::not_found("Session config not found"))?;
771            config.default_model
772        };
773
774        let user_content: Vec<UserContent> = req
775            .content
776            .into_iter()
777            .filter_map(|item| match item.content {
778                Some(proto::user_content::Content::Text(text)) => Some(UserContent::Text { text }),
779                Some(proto::user_content::Content::CommandExecution(cmd)) => {
780                    Some(UserContent::CommandExecution {
781                        command: cmd.command,
782                        stdout: cmd.stdout,
783                        stderr: cmd.stderr,
784                        exit_code: cmd.exit_code,
785                    })
786                }
787                Some(proto::user_content::Content::Image(image)) => {
788                    let source = image.source.map(|source| match source {
789                        proto::image_content::Source::SessionFile(file) => {
790                            steer_core::app::conversation::ImageSource::SessionFile {
791                                relative_path: file.relative_path,
792                            }
793                        }
794                        proto::image_content::Source::DataUrl(data_url) => {
795                            steer_core::app::conversation::ImageSource::DataUrl {
796                                data_url: data_url.data_url,
797                            }
798                        }
799                        proto::image_content::Source::Url(url) => {
800                            steer_core::app::conversation::ImageSource::Url { url: url.url }
801                        }
802                    });
803
804                    source.map(|source| UserContent::Image {
805                        image: steer_core::app::conversation::ImageContent {
806                            mime_type: image.mime_type,
807                            source,
808                            width: image.width,
809                            height: image.height,
810                            bytes: image.bytes,
811                            sha256: image.sha256,
812                        },
813                    })
814                }
815                None => None,
816            })
817            .collect();
818
819        let fallback_text = req.message;
820        let content = if user_content.is_empty() {
821            vec![UserContent::Text {
822                text: fallback_text,
823            }]
824        } else {
825            user_content
826        };
827
828        let has_text = content
829            .iter()
830            .any(|item| matches!(item, UserContent::Text { text } if !text.trim().is_empty()));
831
832        let has_non_text = content
833            .iter()
834            .any(|item| !matches!(item, UserContent::Text { .. }));
835
836        if !has_text && !has_non_text {
837            return Err(Status::invalid_argument("Input text cannot be empty"));
838        }
839
840        match self
841            .runtime
842            .submit_user_input(session_id, content, model)
843            .await
844        {
845            Ok(op_id) => Ok(Response::new(SendMessageResponse {
846                operation: Some(Operation {
847                    id: op_id.to_string(),
848                    session_id: session_id.to_string(),
849                    r#type: OperationType::SendMessage as i32,
850                    status: OperationStatus::Running as i32,
851                    created_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
852                    completed_at: None,
853                    metadata: std::collections::HashMap::new(),
854                }),
855            })),
856            Err(RuntimeError::InvalidInput { message }) => Err(Status::invalid_argument(message)),
857            Err(e) => {
858                error!("Failed to send message: {}", e);
859                Err(Status::internal(format!("Failed to send message: {e}")))
860            }
861        }
862    }
863
864    async fn edit_message(
865        &self,
866        request: Request<EditMessageRequest>,
867    ) -> Result<Response<EditMessageResponse>, Status> {
868        let req = request.into_inner();
869        let session_id = Self::parse_session_id(&req.session_id)?;
870
871        let model = if let Some(model_spec) = req.model {
872            proto_to_model(&model_spec)
873                .map_err(|e| Status::invalid_argument(format!("Invalid model spec: {e}")))?
874        } else {
875            let config = self
876                .catalog
877                .get_session_config(session_id)
878                .await
879                .map_err(|e| Status::internal(format!("Failed to get session config: {e}")))?
880                .ok_or_else(|| Status::not_found("Session config not found"))?;
881            config.default_model
882        };
883
884        let user_content: Vec<UserContent> = req
885            .content
886            .into_iter()
887            .filter_map(|item| match item.content {
888                Some(proto::user_content::Content::Text(text)) => Some(UserContent::Text { text }),
889                Some(proto::user_content::Content::CommandExecution(cmd)) => {
890                    Some(UserContent::CommandExecution {
891                        command: cmd.command,
892                        stdout: cmd.stdout,
893                        stderr: cmd.stderr,
894                        exit_code: cmd.exit_code,
895                    })
896                }
897                Some(proto::user_content::Content::Image(image)) => {
898                    let source = image.source.map(|source| match source {
899                        proto::image_content::Source::SessionFile(file) => {
900                            steer_core::app::conversation::ImageSource::SessionFile {
901                                relative_path: file.relative_path,
902                            }
903                        }
904                        proto::image_content::Source::DataUrl(data_url) => {
905                            steer_core::app::conversation::ImageSource::DataUrl {
906                                data_url: data_url.data_url,
907                            }
908                        }
909                        proto::image_content::Source::Url(url) => {
910                            steer_core::app::conversation::ImageSource::Url { url: url.url }
911                        }
912                    });
913
914                    source.map(|source| UserContent::Image {
915                        image: steer_core::app::conversation::ImageContent {
916                            mime_type: image.mime_type,
917                            source,
918                            width: image.width,
919                            height: image.height,
920                            bytes: image.bytes,
921                            sha256: image.sha256,
922                        },
923                    })
924                }
925                None => None,
926            })
927            .collect();
928
929        let content = if user_content.is_empty() {
930            vec![UserContent::Text {
931                text: req.new_content,
932            }]
933        } else {
934            user_content
935        };
936
937        self.runtime
938            .submit_edited_message(session_id, req.message_id, content, model)
939            .await
940            .map_err(|e| match e {
941                RuntimeError::InvalidInput { message } => Status::failed_precondition(message),
942                other => Status::internal(format!("Failed to edit message: {other}")),
943            })?;
944
945        Ok(Response::new(EditMessageResponse {}))
946    }
947
948    async fn dequeue_queued_item(
949        &self,
950        request: Request<DequeueQueuedItemRequest>,
951    ) -> Result<Response<DequeueQueuedItemResponse>, Status> {
952        let req = request.into_inner();
953        let session_id = Self::parse_session_id(&req.session_id)?;
954
955        self.runtime
956            .submit_dequeue_queued_item(session_id)
957            .await
958            .map_err(|e| match e {
959                RuntimeError::InvalidInput { message } => Status::failed_precondition(message),
960                other => Status::internal(format!("Failed to dequeue queued item: {other}")),
961            })?;
962
963        Ok(Response::new(DequeueQueuedItemResponse {}))
964    }
965
966    async fn approve_tool(
967        &self,
968        request: Request<ApproveToolRequest>,
969    ) -> Result<Response<ApproveToolResponse>, Status> {
970        let req = request.into_inner();
971        let session_id = Self::parse_session_id(&req.session_id)?;
972
973        let request_id = Uuid::parse_str(&req.tool_call_id)
974            .map(steer_core::app::domain::types::RequestId::from)
975            .map_err(|_| Status::invalid_argument("Invalid tool call ID"))?;
976
977        let (approved, remember) = match req.decision {
978            Some(decision) => match decision.decision_type {
979                Some(proto::approval_decision::DecisionType::Deny(_)) => (false, None),
980                Some(proto::approval_decision::DecisionType::Once(_)) => (true, None),
981                Some(proto::approval_decision::DecisionType::AlwaysTool(_)) => (
982                    true,
983                    Some(steer_core::app::domain::action::ApprovalMemory::PendingTool),
984                ),
985                Some(proto::approval_decision::DecisionType::AlwaysBashPattern(pattern)) => (
986                    true,
987                    Some(steer_core::app::domain::action::ApprovalMemory::BashPattern(pattern)),
988                ),
989                None => {
990                    return Err(Status::invalid_argument("Invalid approval decision"));
991                }
992            },
993            None => {
994                return Err(Status::invalid_argument("Missing approval decision"));
995            }
996        };
997
998        match self
999            .runtime
1000            .submit_tool_approval(session_id, request_id, approved, remember)
1001            .await
1002        {
1003            Ok(()) => Ok(Response::new(ApproveToolResponse {})),
1004            Err(e) => {
1005                error!("Failed to approve tool: {}", e);
1006                Err(Status::internal(format!("Failed to approve tool: {e}")))
1007            }
1008        }
1009    }
1010
1011    async fn switch_primary_agent(
1012        &self,
1013        request: Request<SwitchPrimaryAgentRequest>,
1014    ) -> Result<Response<SwitchPrimaryAgentResponse>, Status> {
1015        let req = request.into_inner();
1016        let session_id = Self::parse_session_id(&req.session_id)?;
1017
1018        self.runtime
1019            .switch_primary_agent(session_id, req.primary_agent_id)
1020            .await
1021            .map_err(|e| match e {
1022                RuntimeError::InvalidInput { message } => {
1023                    if message.contains("operation is active") {
1024                        Status::failed_precondition(message)
1025                    } else {
1026                        Status::invalid_argument(message)
1027                    }
1028                }
1029                other => Status::internal(format!("Failed to switch primary agent: {other}")),
1030            })?;
1031
1032        Ok(Response::new(SwitchPrimaryAgentResponse {}))
1033    }
1034
1035    async fn cancel_operation(
1036        &self,
1037        request: Request<CancelOperationRequest>,
1038    ) -> Result<Response<CancelOperationResponse>, Status> {
1039        let req = request.into_inner();
1040        let session_id = Self::parse_session_id(&req.session_id)?;
1041
1042        match self.runtime.cancel_operation(session_id, None).await {
1043            Ok(()) => Ok(Response::new(CancelOperationResponse {})),
1044            Err(e) => {
1045                error!("Failed to cancel operation: {}", e);
1046                Err(Status::internal(format!("Failed to cancel operation: {e}")))
1047            }
1048        }
1049    }
1050
1051    async fn compact_session(
1052        &self,
1053        request: Request<CompactSessionRequest>,
1054    ) -> Result<Response<CompactSessionResponse>, Status> {
1055        let req = request.into_inner();
1056        let session_id = Self::parse_session_id(&req.session_id)?;
1057        let model_spec = req
1058            .model
1059            .ok_or_else(|| Status::invalid_argument("Missing model spec"))?;
1060        let model = proto_to_model(&model_spec)
1061            .map_err(|e| Status::invalid_argument(format!("Invalid model spec: {e}")))?;
1062
1063        self.runtime
1064            .compact_session(session_id, model)
1065            .await
1066            .map_err(|e| match e {
1067                RuntimeError::InvalidInput { message } => Status::failed_precondition(message),
1068                other => Status::internal(format!("Failed to compact session: {other}")),
1069            })?;
1070
1071        Ok(Response::new(CompactSessionResponse {}))
1072    }
1073
1074    async fn execute_bash_command(
1075        &self,
1076        request: Request<ExecuteBashCommandRequest>,
1077    ) -> Result<Response<ExecuteBashCommandResponse>, Status> {
1078        let req = request.into_inner();
1079        let session_id = Self::parse_session_id(&req.session_id)?;
1080
1081        self.runtime
1082            .execute_bash_command(session_id, req.command)
1083            .await
1084            .map_err(|e| Status::internal(format!("Failed to execute bash command: {e}")))?;
1085
1086        Ok(Response::new(ExecuteBashCommandResponse {}))
1087    }
1088
1089    async fn list_files(
1090        &self,
1091        request: Request<ListFilesRequest>,
1092    ) -> Result<Response<Self::ListFilesStream>, Status> {
1093        let req = request.into_inner();
1094        let session_id = Self::parse_session_id(&req.session_id)?;
1095
1096        debug!("ListFiles called for session: {}", session_id);
1097
1098        let config = self
1099            .catalog
1100            .get_session_config(session_id)
1101            .await
1102            .map_err(|e| Status::internal(format!("Failed to get session config: {e}")))?
1103            .ok_or_else(|| Status::not_found(format!("Session not found: {session_id}")))?;
1104
1105        let workspace =
1106            steer_core::workspace::create_workspace(&config.workspace.to_workspace_config())
1107                .await
1108                .map_err(|e| Status::internal(format!("Failed to create workspace: {e}")))?;
1109
1110        let (tx, rx) = mpsc::channel(100);
1111
1112        let _list_task: tokio::task::JoinHandle<()> = tokio::spawn(async move {
1113            let query = if req.query.is_empty() {
1114                None
1115            } else {
1116                Some(req.query.as_str())
1117            };
1118
1119            let max_results = if req.max_results == 0 {
1120                None
1121            } else {
1122                Some(req.max_results as usize)
1123            };
1124
1125            match workspace.list_files(query, max_results).await {
1126                Ok(files) => {
1127                    for chunk in files.chunks(1000) {
1128                        let response = ListFilesResponse {
1129                            paths: chunk.to_vec(),
1130                        };
1131
1132                        if let Err(e) = tx.send(Ok(response)).await {
1133                            warn!("Failed to send file list chunk: {}", e);
1134                            break;
1135                        }
1136                    }
1137                }
1138                Err(e) => {
1139                    error!("Failed to list files: {}", e);
1140                    let _ = tx
1141                        .send(Err(Status::internal(format!("Failed to list files: {e}"))))
1142                        .await;
1143                }
1144            }
1145        });
1146
1147        Ok(Response::new(ReceiverStream::new(rx)))
1148    }
1149
1150    async fn get_mcp_servers(
1151        &self,
1152        request: Request<GetMcpServersRequest>,
1153    ) -> Result<Response<GetMcpServersResponse>, Status> {
1154        let req = request.into_inner();
1155        let session_id = Self::parse_session_id(&req.session_id)?;
1156
1157        debug!("GetMcpServers called for session: {}", session_id);
1158
1159        let state = self
1160            .runtime
1161            .get_session_state(session_id)
1162            .await
1163            .map_err(|e| Status::internal(format!("Failed to get session state: {e}")))?;
1164
1165        let config = self
1166            .catalog
1167            .get_session_config(session_id)
1168            .await
1169            .map_err(|e| Status::internal(format!("Failed to get session config: {e}")))?;
1170
1171        let transport_map: std::collections::HashMap<String, &steer_core::tools::McpTransport> =
1172            config
1173                .as_ref()
1174                .map(|c| {
1175                    c.tool_config
1176                        .backends
1177                        .iter()
1178                        .map(|b| {
1179                            let steer_core::session::state::BackendConfig::Mcp {
1180                                server_name,
1181                                transport,
1182                                ..
1183                            } = b;
1184                            (server_name.clone(), transport)
1185                        })
1186                        .collect()
1187                })
1188                .unwrap_or_default();
1189
1190        let servers: Vec<proto::McpServerInfo> = state
1191            .mcp_servers
1192            .into_iter()
1193            .map(|(name, mcp_state)| {
1194                use crate::grpc::conversions::mcp_transport_to_proto;
1195                use steer_core::app::domain::action::McpServerState;
1196
1197                let state = match mcp_state {
1198                    McpServerState::Connecting => proto::McpConnectionState {
1199                        state: Some(proto::mcp_connection_state::State::Connecting(
1200                            proto::McpConnecting {},
1201                        )),
1202                    },
1203                    McpServerState::Connected { tools } => {
1204                        let tool_names = tools.iter().map(|t| t.name.clone()).collect();
1205                        proto::McpConnectionState {
1206                            state: Some(proto::mcp_connection_state::State::Connected(
1207                                proto::McpConnected { tool_names },
1208                            )),
1209                        }
1210                    }
1211                    McpServerState::Disconnected { error } => {
1212                        let error_msg = error.unwrap_or_else(|| "Disconnected".to_string());
1213                        proto::McpConnectionState {
1214                            state: Some(proto::mcp_connection_state::State::Failed(
1215                                proto::McpFailed { error: error_msg },
1216                            )),
1217                        }
1218                    }
1219                    McpServerState::Failed { error } => proto::McpConnectionState {
1220                        state: Some(proto::mcp_connection_state::State::Failed(
1221                            proto::McpFailed { error },
1222                        )),
1223                    },
1224                };
1225
1226                proto::McpServerInfo {
1227                    server_name: name.clone(),
1228                    transport: transport_map.get(&name).map(|t| mcp_transport_to_proto(t)),
1229                    state: Some(state),
1230                    last_updated: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
1231                }
1232            })
1233            .collect();
1234
1235        Ok(Response::new(GetMcpServersResponse { servers }))
1236    }
1237
1238    async fn list_providers(
1239        &self,
1240        _request: Request<ListProvidersRequest>,
1241    ) -> Result<Response<ListProvidersResponse>, Status> {
1242        let providers = self
1243            .provider_registry
1244            .all()
1245            .map(|p| proto::ProviderInfo {
1246                id: p.id.storage_key(),
1247                name: p.name.clone(),
1248            })
1249            .collect();
1250
1251        Ok(Response::new(ListProvidersResponse { providers }))
1252    }
1253
1254    async fn list_models(
1255        &self,
1256        request: Request<ListModelsRequest>,
1257    ) -> Result<Response<ListModelsResponse>, Status> {
1258        let req = request.into_inner();
1259
1260        let mut auth_sources: HashMap<steer_core::config::provider::ProviderId, AuthSource> =
1261            HashMap::new();
1262        let mut visibility_policies: HashMap<
1263            steer_core::config::provider::ProviderId,
1264            Option<Arc<dyn ModelVisibilityPolicy>>,
1265        > = HashMap::new();
1266
1267        let mut all_models = Vec::new();
1268
1269        for model in self.model_registry.recommended() {
1270            if let Some(ref provider_id) = req.provider_id
1271                && model.provider.storage_key() != *provider_id
1272            {
1273                continue;
1274            }
1275
1276            let provider_id = model.provider.clone();
1277
1278            let auth_source = if let Some(source) = auth_sources.get(&provider_id) {
1279                source.clone()
1280            } else {
1281                let source = match self
1282                    .llm_config_provider
1283                    .resolve_auth_source(&provider_id)
1284                    .await
1285                {
1286                    Ok(source) => source,
1287                    Err(err) => {
1288                        warn!(
1289                            "Failed to resolve auth source for provider {}: {err}",
1290                            provider_id.as_str()
1291                        );
1292                        AuthSource::None
1293                    }
1294                };
1295                auth_sources.insert(provider_id.clone(), source.clone());
1296                source
1297            };
1298
1299            let policy = visibility_policies
1300                .entry(provider_id.clone())
1301                .or_insert_with(|| {
1302                    self.llm_config_provider
1303                        .plugin_registry()
1304                        .get(&provider_id)
1305                        .and_then(|plugin| plugin.model_visibility().map(Arc::from))
1306                });
1307
1308            if let Some(policy) = policy {
1309                let auth_model_id = AuthModelId {
1310                    provider_id: AuthProviderId(provider_id.as_str().to_string()),
1311                    model_id: model.id.clone(),
1312                };
1313                if !policy.allow_model(&auth_model_id, &auth_source) {
1314                    continue;
1315                }
1316            }
1317
1318            all_models.push(proto::ProviderModel {
1319                provider_id: model.provider.storage_key(),
1320                model_id: model.id.clone(),
1321                display_name: model
1322                    .display_name
1323                    .clone()
1324                    .unwrap_or_else(|| model.id.clone()),
1325                supports_thinking: model
1326                    .parameters
1327                    .as_ref()
1328                    .and_then(|p| p.thinking_config.as_ref())
1329                    .is_some_and(|tc| tc.enabled),
1330                aliases: model.aliases.clone(),
1331                context_window_tokens: model.context_window_tokens,
1332            });
1333        }
1334
1335        Ok(Response::new(ListModelsResponse { models: all_models }))
1336    }
1337
1338    async fn get_provider_auth_status(
1339        &self,
1340        request: Request<proto::GetProviderAuthStatusRequest>,
1341    ) -> Result<Response<proto::GetProviderAuthStatusResponse>, Status> {
1342        let req = request.into_inner();
1343
1344        let mut statuses = Vec::new();
1345        for p in self.provider_registry.all() {
1346            if let Some(ref filter) = req.provider_id
1347                && &p.id.storage_key() != filter
1348            {
1349                continue;
1350            }
1351            let auth_source = self
1352                .llm_config_provider
1353                .resolve_auth_source(&p.id)
1354                .await
1355                .map_err(|e| Status::internal(format!("auth lookup failed: {e}")))?;
1356            let auth_source = crate::grpc::conversions::auth_source_to_proto(auth_source);
1357            statuses.push(proto::ProviderAuthStatus {
1358                provider_id: p.id.storage_key(),
1359                auth_source: Some(auth_source),
1360            });
1361        }
1362
1363        Ok(Response::new(proto::GetProviderAuthStatusResponse {
1364            statuses,
1365        }))
1366    }
1367
1368    async fn start_auth(
1369        &self,
1370        request: Request<proto::StartAuthRequest>,
1371    ) -> Result<Response<proto::StartAuthResponse>, Status> {
1372        self.auth_flow_manager.cleanup().await;
1373        let req = request.into_inner();
1374        let provider_id = steer_core::config::provider::ProviderId(req.provider_id);
1375
1376        let (flow, method) = self.create_auth_flow(&provider_id)?;
1377        let state = flow
1378            .start_auth(method)
1379            .await
1380            .map_err(|e| Status::internal(format!("auth start failed: {e}")))?;
1381        let progress = flow
1382            .get_initial_progress(&state, method)
1383            .await
1384            .map_err(|e| Status::internal(format!("auth progress failed: {e}")))?;
1385
1386        let flow_id = Uuid::new_v4().to_string();
1387        self.auth_flow_manager
1388            .insert(
1389                flow_id.clone(),
1390                AuthFlowEntry {
1391                    flow,
1392                    state,
1393                    last_updated: Instant::now(),
1394                },
1395            )
1396            .await;
1397
1398        Ok(Response::new(proto::StartAuthResponse {
1399            flow_id,
1400            progress: Some(crate::grpc::conversions::auth_progress_to_proto(progress)),
1401        }))
1402    }
1403
1404    async fn send_auth_input(
1405        &self,
1406        request: Request<proto::SendAuthInputRequest>,
1407    ) -> Result<Response<proto::SendAuthInputResponse>, Status> {
1408        self.auth_flow_manager.cleanup().await;
1409        let req = request.into_inner();
1410        let flow_id = req.flow_id.clone();
1411
1412        let mut entry = self
1413            .auth_flow_manager
1414            .take(&flow_id)
1415            .await
1416            .ok_or_else(|| Status::not_found("Auth flow not found"))?;
1417
1418        let progress = entry
1419            .flow
1420            .handle_input(&mut entry.state, &req.input)
1421            .await
1422            .map_err(|e| Status::internal(format!("auth input failed: {e}")))?;
1423
1424        let done = matches!(
1425            progress,
1426            steer_core::auth::AuthProgress::Complete | steer_core::auth::AuthProgress::Error(_)
1427        );
1428
1429        if !done {
1430            entry.last_updated = Instant::now();
1431            self.auth_flow_manager.insert(flow_id, entry).await;
1432        }
1433
1434        Ok(Response::new(proto::SendAuthInputResponse {
1435            progress: Some(crate::grpc::conversions::auth_progress_to_proto(progress)),
1436        }))
1437    }
1438
1439    async fn get_auth_progress(
1440        &self,
1441        request: Request<proto::GetAuthProgressRequest>,
1442    ) -> Result<Response<proto::GetAuthProgressResponse>, Status> {
1443        self.auth_flow_manager.cleanup().await;
1444        let req = request.into_inner();
1445        let flow_id = req.flow_id.clone();
1446
1447        let mut entry = self
1448            .auth_flow_manager
1449            .take(&flow_id)
1450            .await
1451            .ok_or_else(|| Status::not_found("Auth flow not found"))?;
1452
1453        let progress = entry
1454            .flow
1455            .handle_input(&mut entry.state, "")
1456            .await
1457            .map_err(|e| Status::internal(format!("auth progress failed: {e}")))?;
1458
1459        let done = matches!(
1460            progress,
1461            steer_core::auth::AuthProgress::Complete | steer_core::auth::AuthProgress::Error(_)
1462        );
1463
1464        if !done {
1465            entry.last_updated = Instant::now();
1466            self.auth_flow_manager.insert(flow_id, entry).await;
1467        }
1468
1469        Ok(Response::new(proto::GetAuthProgressResponse {
1470            progress: Some(crate::grpc::conversions::auth_progress_to_proto(progress)),
1471        }))
1472    }
1473
1474    async fn cancel_auth(
1475        &self,
1476        request: Request<proto::CancelAuthRequest>,
1477    ) -> Result<Response<proto::CancelAuthResponse>, Status> {
1478        self.auth_flow_manager.cleanup().await;
1479        let req = request.into_inner();
1480        let flow_id = req.flow_id;
1481
1482        let _ = self.auth_flow_manager.take(&flow_id).await;
1483
1484        Ok(Response::new(proto::CancelAuthResponse {}))
1485    }
1486
1487    async fn resolve_model(
1488        &self,
1489        request: Request<proto::ResolveModelRequest>,
1490    ) -> Result<Response<proto::ResolveModelResponse>, Status> {
1491        let req = request.into_inner();
1492
1493        match self.model_registry.resolve(&req.input) {
1494            Ok(model_id) => {
1495                let steer_core::config::model::ModelId { provider, id } = model_id;
1496                let model_spec = proto::ModelSpec {
1497                    provider_id: provider.storage_key(),
1498                    model_id: id,
1499                };
1500                Ok(Response::new(proto::ResolveModelResponse {
1501                    model: Some(model_spec),
1502                }))
1503            }
1504            Err(e) => Err(Status::not_found(format!(
1505                "Failed to resolve model '{}': {}",
1506                req.input, e
1507            ))),
1508        }
1509    }
1510
1511    async fn get_default_model(
1512        &self,
1513        _request: Request<proto::GetDefaultModelRequest>,
1514    ) -> Result<Response<proto::GetDefaultModelResponse>, Status> {
1515        let model = self.select_default_model();
1516        Ok(Response::new(proto::GetDefaultModelResponse {
1517            model: Some(model_to_proto(model)),
1518        }))
1519    }
1520
1521    async fn create_workspace(
1522        &self,
1523        request: Request<proto::CreateWorkspaceRequest>,
1524    ) -> Result<Response<proto::CreateWorkspaceResponse>, Status> {
1525        let req = request.into_inner();
1526        let repo_id = Self::parse_repo_id(&req.repo_id)?;
1527        let parent_workspace_id = match req.parent_workspace_id {
1528            Some(value) => Some(Self::parse_workspace_id(&value)?),
1529            None => None,
1530        };
1531
1532        let strategy = match proto::WorkspaceCreateStrategy::try_from(req.strategy) {
1533            Ok(proto::WorkspaceCreateStrategy::JjWorkspace) => {
1534                steer_workspace::WorkspaceCreateStrategy::JjWorkspace
1535            }
1536            Ok(proto::WorkspaceCreateStrategy::GitWorktree) => {
1537                steer_workspace::WorkspaceCreateStrategy::GitWorktree
1538            }
1539            _ => {
1540                return Err(Status::invalid_argument(
1541                    "Unsupported workspace create strategy",
1542                ));
1543            }
1544        };
1545
1546        let request = steer_workspace::CreateWorkspaceRequest {
1547            repo_id,
1548            name: req.name,
1549            parent_workspace_id,
1550            strategy,
1551        };
1552
1553        let workspace = self
1554            .workspace_manager
1555            .create_workspace(request)
1556            .await
1557            .map_err(Self::workspace_manager_error_to_status)?;
1558
1559        Ok(Response::new(proto::CreateWorkspaceResponse {
1560            workspace: Some(workspace_info_to_proto(&workspace)),
1561        }))
1562    }
1563
1564    async fn resolve_repo(
1565        &self,
1566        request: Request<proto::ResolveRepoRequest>,
1567    ) -> Result<Response<proto::ResolveRepoResponse>, Status> {
1568        let req = request.into_inner();
1569        let environment_id = Self::parse_environment_id(&req.environment_id)?;
1570        let repo = self
1571            .repo_manager
1572            .resolve_repo(environment_id, std::path::Path::new(&req.path))
1573            .await
1574            .map_err(Self::workspace_manager_error_to_status)?;
1575
1576        Ok(Response::new(proto::ResolveRepoResponse {
1577            repo: Some(repo_info_to_proto(&repo)),
1578        }))
1579    }
1580
1581    async fn list_repos(
1582        &self,
1583        request: Request<proto::ListReposRequest>,
1584    ) -> Result<Response<proto::ListReposResponse>, Status> {
1585        let req = request.into_inner();
1586        let environment_id = Self::parse_environment_id(&req.environment_id)?;
1587        let repos = self
1588            .repo_manager
1589            .list_repos(environment_id)
1590            .await
1591            .map_err(Self::workspace_manager_error_to_status)?;
1592
1593        Ok(Response::new(proto::ListReposResponse {
1594            repos: repos.iter().map(repo_info_to_proto).collect(),
1595        }))
1596    }
1597
1598    async fn list_workspaces(
1599        &self,
1600        request: Request<proto::ListWorkspacesRequest>,
1601    ) -> Result<Response<proto::ListWorkspacesResponse>, Status> {
1602        let req = request.into_inner();
1603        let environment_id = Self::parse_environment_id(&req.environment_id)?;
1604
1605        let workspaces = self
1606            .workspace_manager
1607            .list_workspaces(steer_workspace::ListWorkspacesRequest { environment_id })
1608            .await
1609            .map_err(Self::workspace_manager_error_to_status)?;
1610
1611        Ok(Response::new(proto::ListWorkspacesResponse {
1612            workspaces: workspaces.iter().map(workspace_info_to_proto).collect(),
1613        }))
1614    }
1615
1616    async fn get_workspace_status(
1617        &self,
1618        request: Request<proto::GetWorkspaceStatusRequest>,
1619    ) -> Result<Response<proto::GetWorkspaceStatusResponse>, Status> {
1620        let req = request.into_inner();
1621        let workspace_id = Self::parse_workspace_id(&req.workspace_id)?;
1622
1623        let status = self
1624            .workspace_manager
1625            .get_workspace_status(workspace_id)
1626            .await
1627            .map_err(Self::workspace_manager_error_to_status)?;
1628
1629        Ok(Response::new(proto::GetWorkspaceStatusResponse {
1630            status: Some(workspace_status_to_proto(&status)),
1631        }))
1632    }
1633
1634    async fn delete_workspace(
1635        &self,
1636        request: Request<proto::DeleteWorkspaceRequest>,
1637    ) -> Result<Response<proto::DeleteWorkspaceResponse>, Status> {
1638        let req = request.into_inner();
1639        let workspace_id = Self::parse_workspace_id(&req.workspace_id)?;
1640
1641        self.workspace_manager
1642            .delete_workspace(steer_workspace::DeleteWorkspaceRequest { workspace_id })
1643            .await
1644            .map_err(Self::workspace_manager_error_to_status)?;
1645
1646        Ok(Response::new(proto::DeleteWorkspaceResponse {}))
1647    }
1648
1649    async fn create_environment(
1650        &self,
1651        request: Request<proto::CreateEnvironmentRequest>,
1652    ) -> Result<Response<proto::CreateEnvironmentResponse>, Status> {
1653        let req = request.into_inner();
1654        let request = steer_workspace::CreateEnvironmentRequest {
1655            root: req.root_path.map(std::path::PathBuf::from),
1656            name: req.name,
1657        };
1658
1659        let env = self
1660            .environment_manager
1661            .create_environment(request)
1662            .await
1663            .map_err(Self::environment_manager_error_to_status)?;
1664
1665        Ok(Response::new(proto::CreateEnvironmentResponse {
1666            environment: Some(environment_descriptor_to_proto(&env)),
1667        }))
1668    }
1669
1670    async fn get_environment(
1671        &self,
1672        request: Request<proto::GetEnvironmentRequest>,
1673    ) -> Result<Response<proto::GetEnvironmentResponse>, Status> {
1674        let req = request.into_inner();
1675        let environment_id = Self::parse_environment_id(&req.environment_id)?;
1676
1677        let env = self
1678            .environment_manager
1679            .get_environment(environment_id)
1680            .await
1681            .map_err(Self::environment_manager_error_to_status)?;
1682
1683        Ok(Response::new(proto::GetEnvironmentResponse {
1684            environment: Some(environment_descriptor_to_proto(&env)),
1685        }))
1686    }
1687
1688    async fn delete_environment(
1689        &self,
1690        request: Request<proto::DeleteEnvironmentRequest>,
1691    ) -> Result<Response<proto::DeleteEnvironmentResponse>, Status> {
1692        let req = request.into_inner();
1693        let environment_id = Self::parse_environment_id(&req.environment_id)?;
1694        let policy = match proto::EnvironmentDeletePolicy::try_from(req.policy) {
1695            Ok(proto::EnvironmentDeletePolicy::Soft) => {
1696                steer_workspace::EnvironmentDeletePolicy::Soft
1697            }
1698            Ok(proto::EnvironmentDeletePolicy::Hard) => {
1699                steer_workspace::EnvironmentDeletePolicy::Hard
1700            }
1701            _ => steer_workspace::EnvironmentDeletePolicy::Hard,
1702        };
1703
1704        self.environment_manager
1705            .delete_environment(environment_id, policy)
1706            .await
1707            .map_err(Self::environment_manager_error_to_status)?;
1708
1709        Ok(Response::new(proto::DeleteEnvironmentResponse {}))
1710    }
1711}