1use crate::grpc::conversions::{
2 environment_descriptor_to_proto, message_to_proto, model_to_proto, proto_to_model,
3 proto_to_session_policy_overrides, proto_to_tool_config, proto_to_workspace_config,
4 repo_info_to_proto, session_event_to_proto, stream_delta_to_proto, workspace_info_to_proto,
5 workspace_status_to_proto,
6};
7use std::cmp::Ordering as CmpOrdering;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::{Duration, Instant};
12use steer_core::app::conversation::UserContent;
13use steer_core::app::domain::runtime::{RuntimeError, RuntimeHandle};
14use steer_core::app::domain::session::{SessionCatalog, SessionFilter};
15use steer_core::app::domain::types::SessionId;
16use steer_core::auth::api_key::ApiKeyAuthFlow;
17use steer_core::auth::{
18 AuthFlowWrapper, AuthMethod, AuthSource, DynAuthenticationFlow, ModelId as AuthModelId,
19 ModelVisibilityPolicy, ProviderId as AuthProviderId,
20};
21use steer_core::session::state::SessionConfig;
22use steer_proto::agent::v1::{
23 self as proto, ApproveToolRequest, ApproveToolResponse, CancelOperationRequest,
24 CancelOperationResponse, CompactSessionRequest, CompactSessionResponse, CreateSessionRequest,
25 CreateSessionResponse, DeleteSessionRequest, DeleteSessionResponse, DequeueQueuedItemRequest,
26 DequeueQueuedItemResponse, EditMessageRequest, EditMessageResponse, ExecuteBashCommandRequest,
27 ExecuteBashCommandResponse, GetConversationFooter, GetConversationRequest,
28 GetConversationResponse, GetMcpServersRequest, GetMcpServersResponse, GetSessionRequest,
29 GetSessionResponse, ListFilesRequest, ListFilesResponse, ListModelsRequest, ListModelsResponse,
30 ListProvidersRequest, ListProvidersResponse, ListSessionsRequest, ListSessionsResponse,
31 Operation, OperationStatus, OperationType, SendMessageRequest, SendMessageResponse,
32 SessionEvent, SessionInfo, SessionStateFooter, SessionStateHeader,
33 SubscribeSessionEventsRequest, SwitchPrimaryAgentRequest, SwitchPrimaryAgentResponse,
34 agent_service_server, get_conversation_response, get_session_response,
35};
36use steer_workspace::{EnvironmentManager, RepoManager, WorkspaceManager};
37use tokio::sync::{Mutex, broadcast, mpsc};
38use tokio_stream::wrappers::ReceiverStream;
39use tonic::{Request, Response, Status};
40use tracing::{debug, error, info, warn};
41use uuid::Uuid;
42
43pub struct RuntimeAgentService {
44 runtime: RuntimeHandle,
45 catalog: Arc<dyn SessionCatalog>,
46 model_registry: Arc<steer_core::model_registry::ModelRegistry>,
47 provider_registry: Arc<steer_core::auth::ProviderRegistry>,
48 llm_config_provider: steer_core::config::LlmConfigProvider,
49 environment_manager: Arc<dyn EnvironmentManager>,
50 workspace_manager: Arc<dyn WorkspaceManager>,
51 repo_manager: Arc<dyn RepoManager>,
52 auth_flow_manager: Arc<AuthFlowManager>,
53}
54
55const AUTH_FLOW_TTL: Duration = Duration::from_secs(10 * 60);
56
57struct AuthFlowEntry {
58 flow: Arc<dyn DynAuthenticationFlow>,
59 state: Box<dyn std::any::Any + Send + Sync>,
60 last_updated: Instant,
61}
62
63#[derive(Default)]
64struct AuthFlowManager {
65 flows: Mutex<HashMap<String, AuthFlowEntry>>,
66}
67
68impl AuthFlowManager {
69 fn new() -> Self {
70 Self::default()
71 }
72
73 async fn insert(&self, flow_id: String, entry: AuthFlowEntry) {
74 let mut flows = self.flows.lock().await;
75 flows.insert(flow_id, entry);
76 }
77
78 async fn take(&self, flow_id: &str) -> Option<AuthFlowEntry> {
79 let mut flows = self.flows.lock().await;
80 flows.remove(flow_id)
81 }
82
83 async fn cleanup(&self) {
84 let mut flows = self.flows.lock().await;
85 flows.retain(|_, entry| entry.last_updated.elapsed() <= AUTH_FLOW_TTL);
86 }
87}
88
89pub struct RuntimeAgentDeps {
90 pub runtime: RuntimeHandle,
91 pub catalog: Arc<dyn SessionCatalog>,
92 pub llm_config_provider: steer_core::config::LlmConfigProvider,
93 pub model_registry: Arc<steer_core::model_registry::ModelRegistry>,
94 pub provider_registry: Arc<steer_core::auth::ProviderRegistry>,
95 pub environment_manager: Arc<dyn EnvironmentManager>,
96 pub workspace_manager: Arc<dyn WorkspaceManager>,
97 pub repo_manager: Arc<dyn RepoManager>,
98}
99
100impl RuntimeAgentService {
101 pub fn new(deps: RuntimeAgentDeps) -> Self {
102 Self {
103 runtime: deps.runtime,
104 catalog: deps.catalog,
105 llm_config_provider: deps.llm_config_provider,
106 model_registry: deps.model_registry,
107 provider_registry: deps.provider_registry,
108 environment_manager: deps.environment_manager,
109 workspace_manager: deps.workspace_manager,
110 repo_manager: deps.repo_manager,
111 auth_flow_manager: Arc::new(AuthFlowManager::new()),
112 }
113 }
114
115 #[expect(clippy::result_large_err)]
116 fn parse_session_id(session_id: &str) -> Result<SessionId, Status> {
117 Uuid::parse_str(session_id)
118 .map(SessionId::from)
119 .map_err(|_| Status::invalid_argument(format!("Invalid session ID: {session_id}")))
120 }
121 fn parse_environment_id(
122 environment_id: &str,
123 ) -> Result<steer_workspace::EnvironmentId, Status> {
124 if environment_id.is_empty() {
125 return Ok(steer_workspace::EnvironmentId::local());
126 }
127 let id = Uuid::parse_str(environment_id).map_err(|_| {
128 Status::invalid_argument(format!("Invalid environment ID: {environment_id}"))
129 })?;
130 Ok(steer_workspace::EnvironmentId::from_uuid(id))
131 }
132
133 fn parse_workspace_id(workspace_id: &str) -> Result<steer_workspace::WorkspaceId, Status> {
134 let id = Uuid::parse_str(workspace_id).map_err(|_| {
135 Status::invalid_argument(format!("Invalid workspace ID: {workspace_id}"))
136 })?;
137 Ok(steer_workspace::WorkspaceId::from_uuid(id))
138 }
139
140 fn parse_repo_id(repo_id: &str) -> Result<steer_workspace::RepoId, Status> {
141 let id = Uuid::parse_str(repo_id)
142 .map_err(|_| Status::invalid_argument(format!("Invalid repo ID: {repo_id}")))?;
143 Ok(steer_workspace::RepoId::from_uuid(id))
144 }
145
146 fn select_default_model(&self) -> steer_core::config::model::ModelId {
147 let builtin_default = steer_core::config::model::builtin::default_model();
148
149 if let Some(config) = self.model_registry.get(&builtin_default)
150 && config.recommended
151 {
152 return builtin_default;
153 }
154
155 let mut recommended: Vec<_> = self.model_registry.recommended().collect();
156 if recommended.is_empty() {
157 return builtin_default;
158 }
159
160 recommended.sort_by(|a, b| {
161 let provider_cmp = a.provider.as_str().cmp(b.provider.as_str());
162 if provider_cmp == CmpOrdering::Equal {
163 a.id.cmp(&b.id)
164 } else {
165 provider_cmp
166 }
167 });
168
169 let chosen = recommended[0];
170 steer_core::config::model::ModelId::new(chosen.provider.clone(), chosen.id.clone())
171 }
172
173 fn workspace_manager_error_to_status(err: steer_workspace::WorkspaceManagerError) -> Status {
174 match err {
175 steer_workspace::WorkspaceManagerError::NotFound(msg) => Status::not_found(msg),
176 steer_workspace::WorkspaceManagerError::NotSupported(msg) => {
177 Status::failed_precondition(msg)
178 }
179 steer_workspace::WorkspaceManagerError::InvalidRequest(msg) => {
180 Status::invalid_argument(msg)
181 }
182 steer_workspace::WorkspaceManagerError::Io(msg)
183 | steer_workspace::WorkspaceManagerError::Other(msg) => Status::internal(msg),
184 }
185 }
186
187 fn environment_manager_error_to_status(
188 err: steer_workspace::EnvironmentManagerError,
189 ) -> Status {
190 match err {
191 steer_workspace::EnvironmentManagerError::NotFound(msg) => Status::not_found(msg),
192 steer_workspace::EnvironmentManagerError::NotSupported(msg) => {
193 Status::failed_precondition(msg)
194 }
195 steer_workspace::EnvironmentManagerError::InvalidRequest(msg) => {
196 Status::invalid_argument(msg)
197 }
198 steer_workspace::EnvironmentManagerError::Io(msg)
199 | steer_workspace::EnvironmentManagerError::Other(msg) => Status::internal(msg),
200 }
201 }
202
203 fn create_auth_flow(
204 &self,
205 provider_id: &steer_core::config::provider::ProviderId,
206 ) -> Result<(Arc<dyn DynAuthenticationFlow>, AuthMethod), Status> {
207 let provider_cfg = self.provider_registry.get(provider_id).ok_or_else(|| {
208 Status::not_found(format!("Unknown provider: {}", provider_id.as_str()))
209 })?;
210 let provider_name = provider_cfg.name.clone();
211 let auth_storage = self.llm_config_provider.auth_storage().clone();
212
213 if let Some(plugin) = self.llm_config_provider.plugin_registry().get(provider_id)
214 && let Some(flow) = plugin.create_flow(auth_storage.clone())
215 {
216 let methods = flow.available_methods();
217 let method = if methods.contains(&AuthMethod::OAuth) {
218 AuthMethod::OAuth
219 } else if methods.contains(&AuthMethod::ApiKey) {
220 AuthMethod::ApiKey
221 } else {
222 return Err(Status::failed_precondition(format!(
223 "No supported auth methods for provider {}",
224 provider_id.as_str()
225 )));
226 };
227 return Ok((Arc::from(flow), method));
228 }
229
230 let flow = AuthFlowWrapper::new(ApiKeyAuthFlow::new(
231 auth_storage,
232 provider_id.clone(),
233 provider_name,
234 ));
235 Ok((Arc::new(flow), AuthMethod::ApiKey))
236 }
237}
238
239#[tonic::async_trait]
240impl agent_service_server::AgentService for RuntimeAgentService {
241 type SubscribeSessionEventsStream = ReceiverStream<Result<SessionEvent, Status>>;
242 type ListFilesStream = ReceiverStream<Result<ListFilesResponse, Status>>;
243 type GetSessionStream =
244 std::pin::Pin<Box<dyn futures::Stream<Item = Result<GetSessionResponse, Status>> + Send>>;
245 type GetConversationStream = std::pin::Pin<
246 Box<dyn futures::Stream<Item = Result<GetConversationResponse, Status>> + Send>,
247 >;
248
249 async fn subscribe_session_events(
250 &self,
251 request: Request<SubscribeSessionEventsRequest>,
252 ) -> Result<Response<Self::SubscribeSessionEventsStream>, Status> {
253 let req = request.into_inner();
254 let session_id = Self::parse_session_id(&req.session_id)?;
255
256 if let Err(e) = self.runtime.resume_session(session_id).await
257 && !matches!(e, RuntimeError::SessionNotFound { .. })
258 {
259 error!("Failed to resume session {}: {}", session_id, e);
260 }
261
262 let subscription = self
263 .runtime
264 .subscribe_events(session_id)
265 .await
266 .map_err(|e| Status::internal(format!("Failed to subscribe: {e}")))?;
267
268 let delta_subscription = self
269 .runtime
270 .subscribe_deltas(session_id)
271 .await
272 .map_err(|e| Status::internal(format!("Failed to subscribe to deltas: {e}")))?;
273
274 let (tx, rx) = mpsc::channel(100);
275 let last_sequence = Arc::new(AtomicU64::new(req.since_sequence.unwrap_or(0)));
276 let delta_sequence = Arc::new(AtomicU64::new(0));
277
278 let mut min_live_seq = req.since_sequence.map(|seq| seq.saturating_add(1));
279
280 if let Some(after_seq) = req.since_sequence {
281 match self.runtime.load_events_after(session_id, after_seq).await {
282 Ok(events) => {
283 let mut last_seq = after_seq;
284 for (seq, event) in events {
285 last_seq = last_seq.max(seq);
286 let proto_event = match session_event_to_proto(event, seq) {
287 Ok(event) => event,
288 Err(e) => {
289 warn!("Failed to convert session replay event: {}", e);
290 continue;
291 }
292 };
293
294 if proto_event.event.is_none() {
295 continue;
296 }
297
298 if let Err(e) = tx.send(Ok(proto_event)).await {
299 warn!("Failed to send replay event to client: {}", e);
300 break;
301 }
302 }
303 min_live_seq = Some(last_seq.saturating_add(1));
304 last_sequence.store(last_seq, Ordering::Relaxed);
305 }
306 Err(e) => {
307 warn!("Failed to load replay events: {}", e);
308 }
309 }
310 }
311
312 let event_tx = tx.clone();
313 let last_sequence_events = last_sequence.clone();
314 let delta_sequence_counter = delta_sequence.clone();
315 let min_live_seq = min_live_seq;
316 tokio::spawn(async move {
317 async fn send_delta(
318 delta: steer_core::app::domain::delta::StreamDelta,
319 tx: &mpsc::Sender<Result<proto::SessionEvent, Status>>,
320 last_sequence: &Arc<AtomicU64>,
321 delta_sequence: &Arc<AtomicU64>,
322 ) -> Result<(), ()> {
323 let sequence_num = last_sequence.load(Ordering::Relaxed);
324 let delta_sequence = delta_sequence.fetch_add(1, Ordering::Relaxed);
325 let proto_event = match stream_delta_to_proto(delta, sequence_num, delta_sequence) {
326 Ok(event) => event,
327 Err(e) => {
328 warn!("Failed to convert stream delta: {}", e);
329 return Ok(());
330 }
331 };
332
333 if let Err(e) = tx.send(Ok(proto_event)).await {
334 warn!("Failed to send delta to client: {}", e);
335 return Err(());
336 }
337
338 Ok(())
339 }
340
341 let mut subscription = subscription;
342 let mut delta_rx = delta_subscription;
343 let mut events_closed = false;
344 let mut deltas_closed = false;
345
346 loop {
347 if events_closed && deltas_closed {
348 break;
349 }
350
351 tokio::select! {
352 envelope = subscription.recv(), if !events_closed => {
353 match envelope {
354 Some(envelope) => {
355 loop {
356 match delta_rx.try_recv() {
357 Ok(delta) => {
358 if send_delta(
359 delta,
360 &event_tx,
361 &last_sequence_events,
362 &delta_sequence_counter,
363 )
364 .await
365 .is_err()
366 {
367 return;
368 }
369 }
370 Err(broadcast::error::TryRecvError::Empty) => break,
371 Err(broadcast::error::TryRecvError::Lagged(skipped)) => {
372 warn!("Delta subscription lagged by {} messages", skipped);
373 }
374 Err(broadcast::error::TryRecvError::Closed) => {
375 deltas_closed = true;
376 break;
377 }
378 }
379 }
380
381 if let Some(min_seq) = min_live_seq
382 && envelope.seq < min_seq {
383 continue;
384 }
385
386 let proto_event = match session_event_to_proto(envelope.event, envelope.seq) {
387 Ok(event) => event,
388 Err(e) => {
389 warn!("Failed to convert session event: {}", e);
390 continue;
391 }
392 };
393
394 if proto_event.event.is_none() {
395 continue;
396 }
397
398 if let Err(e) = event_tx.send(Ok(proto_event)).await {
399 warn!("Failed to send event to client: {}", e);
400 break;
401 }
402 last_sequence_events.store(envelope.seq, Ordering::Relaxed);
403 }
404 None => {
405 events_closed = true;
406 }
407 }
408 }
409 delta = delta_rx.recv(), if !deltas_closed => {
410 match delta {
411 Ok(delta) => {
412 if send_delta(
413 delta,
414 &event_tx,
415 &last_sequence_events,
416 &delta_sequence_counter,
417 )
418 .await
419 .is_err()
420 {
421 break;
422 }
423 }
424 Err(broadcast::error::RecvError::Lagged(skipped)) => {
425 warn!("Delta subscription lagged by {} messages", skipped);
426 }
427 Err(broadcast::error::RecvError::Closed) => {
428 deltas_closed = true;
429 }
430 }
431 }
432 }
433 }
434 debug!("Event forwarding task ended for session: {}", session_id);
435 });
436
437 Ok(Response::new(ReceiverStream::new(rx)))
438 }
439
440 async fn create_session(
441 &self,
442 request: Request<CreateSessionRequest>,
443 ) -> Result<Response<CreateSessionResponse>, Status> {
444 let req = request.into_inner();
445
446 let default_model_spec = req
447 .default_model
448 .ok_or_else(|| Status::invalid_argument("Missing required default_model"))?;
449 let default_model = proto_to_model(&default_model_spec)
450 .map_err(|e| Status::invalid_argument(format!("Invalid default_model: {e}")))?;
451
452 let tool_config = req
453 .tool_config
454 .map(proto_to_tool_config)
455 .unwrap_or_default();
456
457 let workspace_config = req
458 .workspace_config
459 .map(proto_to_workspace_config)
460 .unwrap_or_default();
461
462 let policy_overrides = proto_to_session_policy_overrides(req.policy_overrides);
463
464 let mut workspace_id = None;
465 let mut workspace_ref = None;
466 let mut repo_ref = None;
467 let parent_session_id = None;
468 let mut workspace_name = None;
469
470 if repo_ref.is_none()
471 && let steer_core::session::state::WorkspaceConfig::Local { path } = &workspace_config
472 {
473 match self
474 .repo_manager
475 .resolve_repo(steer_workspace::EnvironmentId::local(), path)
476 .await
477 {
478 Ok(repo_info) => {
479 repo_ref = Some(steer_workspace::RepoRef {
480 environment_id: repo_info.environment_id,
481 repo_id: repo_info.repo_id,
482 root_path: repo_info.root_path.clone(),
483 vcs_kind: repo_info.vcs_kind,
484 });
485 workspace_name = repo_info
486 .root_path
487 .file_name()
488 .map(|n| n.to_string_lossy().into_owned());
489 }
490 Err(err) => {
491 warn!("Failed to resolve repo for session: {}", err);
492 }
493 }
494 }
495
496 if workspace_id.is_none()
497 && workspace_ref.is_none()
498 && let steer_core::session::state::WorkspaceConfig::Local { path } = &workspace_config
499 && let Ok(info) = self.workspace_manager.resolve_workspace(path).await
500 {
501 workspace_id = Some(info.workspace_id);
502 workspace_ref = Some(steer_workspace::WorkspaceRef {
503 environment_id: info.environment_id,
504 workspace_id: info.workspace_id,
505 repo_id: info.repo_id,
506 });
507 workspace_name.clone_from(&info.name);
508 }
509
510 let session_config = SessionConfig {
511 workspace: workspace_config,
512 workspace_ref,
513 workspace_id,
514 repo_ref,
515 parent_session_id,
516 workspace_name,
517 tool_config,
518 system_prompt: None,
519 primary_agent_id: req.primary_agent_id,
520 policy_overrides,
521 metadata: req.metadata,
522 default_model,
523 auto_compaction: req
524 .auto_compaction
525 .map(|ac| steer_core::session::state::AutoCompactionConfig {
526 enabled: ac.enabled,
527 threshold_percent: ac.threshold_percent,
528 })
529 .unwrap_or_default(),
530 };
531
532 match self.runtime.create_session(session_config.clone()).await {
533 Ok(session_id) => {
534 if let Err(e) = self
535 .catalog
536 .update_session_catalog(session_id, Some(&session_config), false, None)
537 .await
538 {
539 error!("Failed to update session catalog: {}", e);
540 return Err(Status::internal(format!(
541 "Failed to update session catalog: {e}"
542 )));
543 }
544
545 let session_info = SessionInfo {
546 id: session_id.to_string(),
547 created_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
548 updated_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
549 status: proto::SessionStatus::Active as i32,
550 metadata: None,
551 };
552 Ok(Response::new(CreateSessionResponse {
553 session: Some(session_info),
554 }))
555 }
556 Err(e) => {
557 error!("Failed to create session: {}", e);
558 Err(Status::internal(format!("Failed to create session: {e}")))
559 }
560 }
561 }
562
563 async fn list_sessions(
564 &self,
565 _request: Request<ListSessionsRequest>,
566 ) -> Result<Response<ListSessionsResponse>, Status> {
567 let filter = SessionFilter::default();
568
569 match self.catalog.list_sessions(filter).await {
570 Ok(sessions) => {
571 let proto_sessions = sessions
572 .into_iter()
573 .map(|s| SessionInfo {
574 id: s.id.to_string(),
575 created_at: Some(prost_types::Timestamp::from(
576 std::time::SystemTime::from(s.created_at),
577 )),
578 updated_at: Some(prost_types::Timestamp::from(
579 std::time::SystemTime::from(s.updated_at),
580 )),
581 status: proto::SessionStatus::Active as i32,
582 metadata: None,
583 })
584 .collect();
585
586 Ok(Response::new(ListSessionsResponse {
587 sessions: proto_sessions,
588 next_page_token: None,
589 }))
590 }
591 Err(e) => {
592 error!("Failed to list sessions: {}", e);
593 Err(Status::internal(format!("Failed to list sessions: {e}")))
594 }
595 }
596 }
597
598 async fn get_session(
599 &self,
600 request: Request<GetSessionRequest>,
601 ) -> Result<Response<Self::GetSessionStream>, Status> {
602 let req = request.into_inner();
603 let session_id = Self::parse_session_id(&req.session_id)?;
604 let runtime = self.runtime.clone();
605 let catalog = self.catalog.clone();
606
607 let stream = async_stream::try_stream! {
608 if let Err(e) = runtime.resume_session(session_id).await
609 && matches!(e, RuntimeError::SessionNotFound { .. }) {
610 Err(Status::not_found(format!("Session not found: {session_id}")))?;
611 return;
612 }
613
614 let state = runtime.get_session_state(session_id).await
615 .map_err(|e| Status::internal(format!("Failed to get session state: {e}")))?;
616
617 let config = catalog.get_session_config(session_id).await
618 .map_err(|e| Status::internal(format!("Failed to get session config: {e}")))?;
619
620 yield GetSessionResponse {
621 chunk: Some(get_session_response::Chunk::Header(SessionStateHeader {
622 id: session_id.to_string(),
623 created_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
624 updated_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
625 config: config.map(|c| crate::grpc::conversions::session_config_to_proto(&c)),
626 last_event_sequence: state.event_sequence,
627 })),
628 };
629
630 for message in state.message_graph.messages {
631 let proto_msg = message_to_proto(message)
632 .map_err(|e| Status::internal(format!("Failed to convert message: {e}")))?;
633 yield GetSessionResponse {
634 chunk: Some(get_session_response::Chunk::Message(proto_msg)),
635 };
636 }
637
638 yield GetSessionResponse {
639 chunk: Some(get_session_response::Chunk::Footer(SessionStateFooter {
640 approved_tools: state.approved_tools.into_iter().collect(),
641 metadata: std::collections::HashMap::new(),
642 queued_head: state
643 .queued_work
644 .front()
645 .map(|item| match item {
646 steer_core::app::domain::state::QueuedWorkItem::UserMessage(message) => {
647 proto::QueuedWorkItem {
648 kind: proto::queued_work_item::Kind::UserMessage as i32,
649 content: message
650 .content
651 .iter()
652 .filter_map(|item| match item {
653 UserContent::Text { text } => Some(text.as_str()),
654 _ => None,
655 })
656 .collect::<Vec<_>>()
657 .join("\n"),
658 model: Some(model_to_proto(message.model.clone())),
659 queued_at: message.queued_at,
660 op_id: message.op_id.to_string(),
661 message_id: message.message_id.to_string(),
662 attachment_count: message
663 .content
664 .iter()
665 .filter(|item| matches!(item, UserContent::Image { .. }))
666 .count() as u32,
667 }
668 }
669 steer_core::app::domain::state::QueuedWorkItem::DirectBash(command) => {
670 proto::QueuedWorkItem {
671 kind: proto::queued_work_item::Kind::DirectBash as i32,
672 content: command.command.clone(),
673 model: None,
674 queued_at: command.queued_at,
675 op_id: command.op_id.to_string(),
676 message_id: command.message_id.to_string(),
677 attachment_count: 0,
678 }
679 }
680 }),
681 queued_count: state.queued_work.len() as u32,
682 })),
683 };
684 };
685
686 Ok(Response::new(Box::pin(stream)))
687 }
688
689 async fn delete_session(
690 &self,
691 request: Request<DeleteSessionRequest>,
692 ) -> Result<Response<DeleteSessionResponse>, Status> {
693 let req = request.into_inner();
694 let session_id = Self::parse_session_id(&req.session_id)?;
695
696 match self.runtime.delete_session(session_id).await {
697 Ok(()) => Ok(Response::new(DeleteSessionResponse {})),
698 Err(RuntimeError::SessionNotFound { .. }) => Err(Status::not_found(format!(
699 "Session not found: {}",
700 req.session_id
701 ))),
702 Err(e) => {
703 error!("Failed to delete session: {}", e);
704 Err(Status::internal(format!("Failed to delete session: {e}")))
705 }
706 }
707 }
708
709 async fn get_conversation(
710 &self,
711 request: Request<GetConversationRequest>,
712 ) -> Result<Response<Self::GetConversationStream>, Status> {
713 let req = request.into_inner();
714 let session_id = Self::parse_session_id(&req.session_id)?;
715 let runtime = self.runtime.clone();
716
717 info!("GetConversation called for session: {}", session_id);
718
719 let stream = async_stream::try_stream! {
720 if let Err(e) = runtime.resume_session(session_id).await
721 && matches!(e, RuntimeError::SessionNotFound { .. }) {
722 Err(Status::not_found(format!("Session not found: {session_id}")))?;
723 return;
724 }
725
726 let state = runtime.get_session_state(session_id).await
727 .map_err(|e| Status::internal(format!("Failed to get session state: {e}")))?;
728
729 info!(
730 "Found session state with {} messages and {} approved tools",
731 state.message_graph.messages.len(),
732 state.approved_tools.len()
733 );
734
735 for msg in state.message_graph.messages {
736 let proto_msg = message_to_proto(msg)
737 .map_err(|e| Status::internal(format!("Failed to convert message: {e}")))?;
738 yield GetConversationResponse {
739 chunk: Some(get_conversation_response::Chunk::Message(proto_msg)),
740 };
741 }
742
743 yield GetConversationResponse {
744 chunk: Some(get_conversation_response::Chunk::Footer(GetConversationFooter {
745 approved_tools: state.approved_tools.into_iter().collect(),
746 compaction_summary_ids: state.compaction_summary_ids.into_iter().collect(),
747 })),
748 };
749 };
750
751 Ok(Response::new(Box::pin(stream)))
752 }
753
754 async fn send_message(
755 &self,
756 request: Request<SendMessageRequest>,
757 ) -> Result<Response<SendMessageResponse>, Status> {
758 let req = request.into_inner();
759 let session_id = Self::parse_session_id(&req.session_id)?;
760
761 let model = if let Some(model_spec) = req.model {
762 proto_to_model(&model_spec)
763 .map_err(|e| Status::invalid_argument(format!("Invalid model spec: {e}")))?
764 } else {
765 let config = self
766 .catalog
767 .get_session_config(session_id)
768 .await
769 .map_err(|e| Status::internal(format!("Failed to get session config: {e}")))?
770 .ok_or_else(|| Status::not_found("Session config not found"))?;
771 config.default_model
772 };
773
774 let user_content: Vec<UserContent> = req
775 .content
776 .into_iter()
777 .filter_map(|item| match item.content {
778 Some(proto::user_content::Content::Text(text)) => Some(UserContent::Text { text }),
779 Some(proto::user_content::Content::CommandExecution(cmd)) => {
780 Some(UserContent::CommandExecution {
781 command: cmd.command,
782 stdout: cmd.stdout,
783 stderr: cmd.stderr,
784 exit_code: cmd.exit_code,
785 })
786 }
787 Some(proto::user_content::Content::Image(image)) => {
788 let source = image.source.map(|source| match source {
789 proto::image_content::Source::SessionFile(file) => {
790 steer_core::app::conversation::ImageSource::SessionFile {
791 relative_path: file.relative_path,
792 }
793 }
794 proto::image_content::Source::DataUrl(data_url) => {
795 steer_core::app::conversation::ImageSource::DataUrl {
796 data_url: data_url.data_url,
797 }
798 }
799 proto::image_content::Source::Url(url) => {
800 steer_core::app::conversation::ImageSource::Url { url: url.url }
801 }
802 });
803
804 source.map(|source| UserContent::Image {
805 image: steer_core::app::conversation::ImageContent {
806 mime_type: image.mime_type,
807 source,
808 width: image.width,
809 height: image.height,
810 bytes: image.bytes,
811 sha256: image.sha256,
812 },
813 })
814 }
815 None => None,
816 })
817 .collect();
818
819 let fallback_text = req.message;
820 let content = if user_content.is_empty() {
821 vec![UserContent::Text {
822 text: fallback_text,
823 }]
824 } else {
825 user_content
826 };
827
828 let has_text = content
829 .iter()
830 .any(|item| matches!(item, UserContent::Text { text } if !text.trim().is_empty()));
831
832 let has_non_text = content
833 .iter()
834 .any(|item| !matches!(item, UserContent::Text { .. }));
835
836 if !has_text && !has_non_text {
837 return Err(Status::invalid_argument("Input text cannot be empty"));
838 }
839
840 match self
841 .runtime
842 .submit_user_input(session_id, content, model)
843 .await
844 {
845 Ok(op_id) => Ok(Response::new(SendMessageResponse {
846 operation: Some(Operation {
847 id: op_id.to_string(),
848 session_id: session_id.to_string(),
849 r#type: OperationType::SendMessage as i32,
850 status: OperationStatus::Running as i32,
851 created_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
852 completed_at: None,
853 metadata: std::collections::HashMap::new(),
854 }),
855 })),
856 Err(RuntimeError::InvalidInput { message }) => Err(Status::invalid_argument(message)),
857 Err(e) => {
858 error!("Failed to send message: {}", e);
859 Err(Status::internal(format!("Failed to send message: {e}")))
860 }
861 }
862 }
863
864 async fn edit_message(
865 &self,
866 request: Request<EditMessageRequest>,
867 ) -> Result<Response<EditMessageResponse>, Status> {
868 let req = request.into_inner();
869 let session_id = Self::parse_session_id(&req.session_id)?;
870
871 let model = if let Some(model_spec) = req.model {
872 proto_to_model(&model_spec)
873 .map_err(|e| Status::invalid_argument(format!("Invalid model spec: {e}")))?
874 } else {
875 let config = self
876 .catalog
877 .get_session_config(session_id)
878 .await
879 .map_err(|e| Status::internal(format!("Failed to get session config: {e}")))?
880 .ok_or_else(|| Status::not_found("Session config not found"))?;
881 config.default_model
882 };
883
884 let user_content: Vec<UserContent> = req
885 .content
886 .into_iter()
887 .filter_map(|item| match item.content {
888 Some(proto::user_content::Content::Text(text)) => Some(UserContent::Text { text }),
889 Some(proto::user_content::Content::CommandExecution(cmd)) => {
890 Some(UserContent::CommandExecution {
891 command: cmd.command,
892 stdout: cmd.stdout,
893 stderr: cmd.stderr,
894 exit_code: cmd.exit_code,
895 })
896 }
897 Some(proto::user_content::Content::Image(image)) => {
898 let source = image.source.map(|source| match source {
899 proto::image_content::Source::SessionFile(file) => {
900 steer_core::app::conversation::ImageSource::SessionFile {
901 relative_path: file.relative_path,
902 }
903 }
904 proto::image_content::Source::DataUrl(data_url) => {
905 steer_core::app::conversation::ImageSource::DataUrl {
906 data_url: data_url.data_url,
907 }
908 }
909 proto::image_content::Source::Url(url) => {
910 steer_core::app::conversation::ImageSource::Url { url: url.url }
911 }
912 });
913
914 source.map(|source| UserContent::Image {
915 image: steer_core::app::conversation::ImageContent {
916 mime_type: image.mime_type,
917 source,
918 width: image.width,
919 height: image.height,
920 bytes: image.bytes,
921 sha256: image.sha256,
922 },
923 })
924 }
925 None => None,
926 })
927 .collect();
928
929 let content = if user_content.is_empty() {
930 vec![UserContent::Text {
931 text: req.new_content,
932 }]
933 } else {
934 user_content
935 };
936
937 self.runtime
938 .submit_edited_message(session_id, req.message_id, content, model)
939 .await
940 .map_err(|e| match e {
941 RuntimeError::InvalidInput { message } => Status::failed_precondition(message),
942 other => Status::internal(format!("Failed to edit message: {other}")),
943 })?;
944
945 Ok(Response::new(EditMessageResponse {}))
946 }
947
948 async fn dequeue_queued_item(
949 &self,
950 request: Request<DequeueQueuedItemRequest>,
951 ) -> Result<Response<DequeueQueuedItemResponse>, Status> {
952 let req = request.into_inner();
953 let session_id = Self::parse_session_id(&req.session_id)?;
954
955 self.runtime
956 .submit_dequeue_queued_item(session_id)
957 .await
958 .map_err(|e| match e {
959 RuntimeError::InvalidInput { message } => Status::failed_precondition(message),
960 other => Status::internal(format!("Failed to dequeue queued item: {other}")),
961 })?;
962
963 Ok(Response::new(DequeueQueuedItemResponse {}))
964 }
965
966 async fn approve_tool(
967 &self,
968 request: Request<ApproveToolRequest>,
969 ) -> Result<Response<ApproveToolResponse>, Status> {
970 let req = request.into_inner();
971 let session_id = Self::parse_session_id(&req.session_id)?;
972
973 let request_id = Uuid::parse_str(&req.tool_call_id)
974 .map(steer_core::app::domain::types::RequestId::from)
975 .map_err(|_| Status::invalid_argument("Invalid tool call ID"))?;
976
977 let (approved, remember) = match req.decision {
978 Some(decision) => match decision.decision_type {
979 Some(proto::approval_decision::DecisionType::Deny(_)) => (false, None),
980 Some(proto::approval_decision::DecisionType::Once(_)) => (true, None),
981 Some(proto::approval_decision::DecisionType::AlwaysTool(_)) => (
982 true,
983 Some(steer_core::app::domain::action::ApprovalMemory::PendingTool),
984 ),
985 Some(proto::approval_decision::DecisionType::AlwaysBashPattern(pattern)) => (
986 true,
987 Some(steer_core::app::domain::action::ApprovalMemory::BashPattern(pattern)),
988 ),
989 None => {
990 return Err(Status::invalid_argument("Invalid approval decision"));
991 }
992 },
993 None => {
994 return Err(Status::invalid_argument("Missing approval decision"));
995 }
996 };
997
998 match self
999 .runtime
1000 .submit_tool_approval(session_id, request_id, approved, remember)
1001 .await
1002 {
1003 Ok(()) => Ok(Response::new(ApproveToolResponse {})),
1004 Err(e) => {
1005 error!("Failed to approve tool: {}", e);
1006 Err(Status::internal(format!("Failed to approve tool: {e}")))
1007 }
1008 }
1009 }
1010
1011 async fn switch_primary_agent(
1012 &self,
1013 request: Request<SwitchPrimaryAgentRequest>,
1014 ) -> Result<Response<SwitchPrimaryAgentResponse>, Status> {
1015 let req = request.into_inner();
1016 let session_id = Self::parse_session_id(&req.session_id)?;
1017
1018 self.runtime
1019 .switch_primary_agent(session_id, req.primary_agent_id)
1020 .await
1021 .map_err(|e| match e {
1022 RuntimeError::InvalidInput { message } => {
1023 if message.contains("operation is active") {
1024 Status::failed_precondition(message)
1025 } else {
1026 Status::invalid_argument(message)
1027 }
1028 }
1029 other => Status::internal(format!("Failed to switch primary agent: {other}")),
1030 })?;
1031
1032 Ok(Response::new(SwitchPrimaryAgentResponse {}))
1033 }
1034
1035 async fn cancel_operation(
1036 &self,
1037 request: Request<CancelOperationRequest>,
1038 ) -> Result<Response<CancelOperationResponse>, Status> {
1039 let req = request.into_inner();
1040 let session_id = Self::parse_session_id(&req.session_id)?;
1041
1042 match self.runtime.cancel_operation(session_id, None).await {
1043 Ok(()) => Ok(Response::new(CancelOperationResponse {})),
1044 Err(e) => {
1045 error!("Failed to cancel operation: {}", e);
1046 Err(Status::internal(format!("Failed to cancel operation: {e}")))
1047 }
1048 }
1049 }
1050
1051 async fn compact_session(
1052 &self,
1053 request: Request<CompactSessionRequest>,
1054 ) -> Result<Response<CompactSessionResponse>, Status> {
1055 let req = request.into_inner();
1056 let session_id = Self::parse_session_id(&req.session_id)?;
1057 let model_spec = req
1058 .model
1059 .ok_or_else(|| Status::invalid_argument("Missing model spec"))?;
1060 let model = proto_to_model(&model_spec)
1061 .map_err(|e| Status::invalid_argument(format!("Invalid model spec: {e}")))?;
1062
1063 self.runtime
1064 .compact_session(session_id, model)
1065 .await
1066 .map_err(|e| match e {
1067 RuntimeError::InvalidInput { message } => Status::failed_precondition(message),
1068 other => Status::internal(format!("Failed to compact session: {other}")),
1069 })?;
1070
1071 Ok(Response::new(CompactSessionResponse {}))
1072 }
1073
1074 async fn execute_bash_command(
1075 &self,
1076 request: Request<ExecuteBashCommandRequest>,
1077 ) -> Result<Response<ExecuteBashCommandResponse>, Status> {
1078 let req = request.into_inner();
1079 let session_id = Self::parse_session_id(&req.session_id)?;
1080
1081 self.runtime
1082 .execute_bash_command(session_id, req.command)
1083 .await
1084 .map_err(|e| Status::internal(format!("Failed to execute bash command: {e}")))?;
1085
1086 Ok(Response::new(ExecuteBashCommandResponse {}))
1087 }
1088
1089 async fn list_files(
1090 &self,
1091 request: Request<ListFilesRequest>,
1092 ) -> Result<Response<Self::ListFilesStream>, Status> {
1093 let req = request.into_inner();
1094 let session_id = Self::parse_session_id(&req.session_id)?;
1095
1096 debug!("ListFiles called for session: {}", session_id);
1097
1098 let config = self
1099 .catalog
1100 .get_session_config(session_id)
1101 .await
1102 .map_err(|e| Status::internal(format!("Failed to get session config: {e}")))?
1103 .ok_or_else(|| Status::not_found(format!("Session not found: {session_id}")))?;
1104
1105 let workspace =
1106 steer_core::workspace::create_workspace(&config.workspace.to_workspace_config())
1107 .await
1108 .map_err(|e| Status::internal(format!("Failed to create workspace: {e}")))?;
1109
1110 let (tx, rx) = mpsc::channel(100);
1111
1112 let _list_task: tokio::task::JoinHandle<()> = tokio::spawn(async move {
1113 let query = if req.query.is_empty() {
1114 None
1115 } else {
1116 Some(req.query.as_str())
1117 };
1118
1119 let max_results = if req.max_results == 0 {
1120 None
1121 } else {
1122 Some(req.max_results as usize)
1123 };
1124
1125 match workspace.list_files(query, max_results).await {
1126 Ok(files) => {
1127 for chunk in files.chunks(1000) {
1128 let response = ListFilesResponse {
1129 paths: chunk.to_vec(),
1130 };
1131
1132 if let Err(e) = tx.send(Ok(response)).await {
1133 warn!("Failed to send file list chunk: {}", e);
1134 break;
1135 }
1136 }
1137 }
1138 Err(e) => {
1139 error!("Failed to list files: {}", e);
1140 let _ = tx
1141 .send(Err(Status::internal(format!("Failed to list files: {e}"))))
1142 .await;
1143 }
1144 }
1145 });
1146
1147 Ok(Response::new(ReceiverStream::new(rx)))
1148 }
1149
1150 async fn get_mcp_servers(
1151 &self,
1152 request: Request<GetMcpServersRequest>,
1153 ) -> Result<Response<GetMcpServersResponse>, Status> {
1154 let req = request.into_inner();
1155 let session_id = Self::parse_session_id(&req.session_id)?;
1156
1157 debug!("GetMcpServers called for session: {}", session_id);
1158
1159 let state = self
1160 .runtime
1161 .get_session_state(session_id)
1162 .await
1163 .map_err(|e| Status::internal(format!("Failed to get session state: {e}")))?;
1164
1165 let config = self
1166 .catalog
1167 .get_session_config(session_id)
1168 .await
1169 .map_err(|e| Status::internal(format!("Failed to get session config: {e}")))?;
1170
1171 let transport_map: std::collections::HashMap<String, &steer_core::tools::McpTransport> =
1172 config
1173 .as_ref()
1174 .map(|c| {
1175 c.tool_config
1176 .backends
1177 .iter()
1178 .map(|b| {
1179 let steer_core::session::state::BackendConfig::Mcp {
1180 server_name,
1181 transport,
1182 ..
1183 } = b;
1184 (server_name.clone(), transport)
1185 })
1186 .collect()
1187 })
1188 .unwrap_or_default();
1189
1190 let servers: Vec<proto::McpServerInfo> = state
1191 .mcp_servers
1192 .into_iter()
1193 .map(|(name, mcp_state)| {
1194 use crate::grpc::conversions::mcp_transport_to_proto;
1195 use steer_core::app::domain::action::McpServerState;
1196
1197 let state = match mcp_state {
1198 McpServerState::Connecting => proto::McpConnectionState {
1199 state: Some(proto::mcp_connection_state::State::Connecting(
1200 proto::McpConnecting {},
1201 )),
1202 },
1203 McpServerState::Connected { tools } => {
1204 let tool_names = tools.iter().map(|t| t.name.clone()).collect();
1205 proto::McpConnectionState {
1206 state: Some(proto::mcp_connection_state::State::Connected(
1207 proto::McpConnected { tool_names },
1208 )),
1209 }
1210 }
1211 McpServerState::Disconnected { error } => {
1212 let error_msg = error.unwrap_or_else(|| "Disconnected".to_string());
1213 proto::McpConnectionState {
1214 state: Some(proto::mcp_connection_state::State::Failed(
1215 proto::McpFailed { error: error_msg },
1216 )),
1217 }
1218 }
1219 McpServerState::Failed { error } => proto::McpConnectionState {
1220 state: Some(proto::mcp_connection_state::State::Failed(
1221 proto::McpFailed { error },
1222 )),
1223 },
1224 };
1225
1226 proto::McpServerInfo {
1227 server_name: name.clone(),
1228 transport: transport_map.get(&name).map(|t| mcp_transport_to_proto(t)),
1229 state: Some(state),
1230 last_updated: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
1231 }
1232 })
1233 .collect();
1234
1235 Ok(Response::new(GetMcpServersResponse { servers }))
1236 }
1237
1238 async fn list_providers(
1239 &self,
1240 _request: Request<ListProvidersRequest>,
1241 ) -> Result<Response<ListProvidersResponse>, Status> {
1242 let providers = self
1243 .provider_registry
1244 .all()
1245 .map(|p| proto::ProviderInfo {
1246 id: p.id.storage_key(),
1247 name: p.name.clone(),
1248 })
1249 .collect();
1250
1251 Ok(Response::new(ListProvidersResponse { providers }))
1252 }
1253
1254 async fn list_models(
1255 &self,
1256 request: Request<ListModelsRequest>,
1257 ) -> Result<Response<ListModelsResponse>, Status> {
1258 let req = request.into_inner();
1259
1260 let mut auth_sources: HashMap<steer_core::config::provider::ProviderId, AuthSource> =
1261 HashMap::new();
1262 let mut visibility_policies: HashMap<
1263 steer_core::config::provider::ProviderId,
1264 Option<Arc<dyn ModelVisibilityPolicy>>,
1265 > = HashMap::new();
1266
1267 let mut all_models = Vec::new();
1268
1269 for model in self.model_registry.recommended() {
1270 if let Some(ref provider_id) = req.provider_id
1271 && model.provider.storage_key() != *provider_id
1272 {
1273 continue;
1274 }
1275
1276 let provider_id = model.provider.clone();
1277
1278 let auth_source = if let Some(source) = auth_sources.get(&provider_id) {
1279 source.clone()
1280 } else {
1281 let source = match self
1282 .llm_config_provider
1283 .resolve_auth_source(&provider_id)
1284 .await
1285 {
1286 Ok(source) => source,
1287 Err(err) => {
1288 warn!(
1289 "Failed to resolve auth source for provider {}: {err}",
1290 provider_id.as_str()
1291 );
1292 AuthSource::None
1293 }
1294 };
1295 auth_sources.insert(provider_id.clone(), source.clone());
1296 source
1297 };
1298
1299 let policy = visibility_policies
1300 .entry(provider_id.clone())
1301 .or_insert_with(|| {
1302 self.llm_config_provider
1303 .plugin_registry()
1304 .get(&provider_id)
1305 .and_then(|plugin| plugin.model_visibility().map(Arc::from))
1306 });
1307
1308 if let Some(policy) = policy {
1309 let auth_model_id = AuthModelId {
1310 provider_id: AuthProviderId(provider_id.as_str().to_string()),
1311 model_id: model.id.clone(),
1312 };
1313 if !policy.allow_model(&auth_model_id, &auth_source) {
1314 continue;
1315 }
1316 }
1317
1318 all_models.push(proto::ProviderModel {
1319 provider_id: model.provider.storage_key(),
1320 model_id: model.id.clone(),
1321 display_name: model
1322 .display_name
1323 .clone()
1324 .unwrap_or_else(|| model.id.clone()),
1325 supports_thinking: model
1326 .parameters
1327 .as_ref()
1328 .and_then(|p| p.thinking_config.as_ref())
1329 .is_some_and(|tc| tc.enabled),
1330 aliases: model.aliases.clone(),
1331 context_window_tokens: model.context_window_tokens,
1332 });
1333 }
1334
1335 Ok(Response::new(ListModelsResponse { models: all_models }))
1336 }
1337
1338 async fn get_provider_auth_status(
1339 &self,
1340 request: Request<proto::GetProviderAuthStatusRequest>,
1341 ) -> Result<Response<proto::GetProviderAuthStatusResponse>, Status> {
1342 let req = request.into_inner();
1343
1344 let mut statuses = Vec::new();
1345 for p in self.provider_registry.all() {
1346 if let Some(ref filter) = req.provider_id
1347 && &p.id.storage_key() != filter
1348 {
1349 continue;
1350 }
1351 let auth_source = self
1352 .llm_config_provider
1353 .resolve_auth_source(&p.id)
1354 .await
1355 .map_err(|e| Status::internal(format!("auth lookup failed: {e}")))?;
1356 let auth_source = crate::grpc::conversions::auth_source_to_proto(auth_source);
1357 statuses.push(proto::ProviderAuthStatus {
1358 provider_id: p.id.storage_key(),
1359 auth_source: Some(auth_source),
1360 });
1361 }
1362
1363 Ok(Response::new(proto::GetProviderAuthStatusResponse {
1364 statuses,
1365 }))
1366 }
1367
1368 async fn start_auth(
1369 &self,
1370 request: Request<proto::StartAuthRequest>,
1371 ) -> Result<Response<proto::StartAuthResponse>, Status> {
1372 self.auth_flow_manager.cleanup().await;
1373 let req = request.into_inner();
1374 let provider_id = steer_core::config::provider::ProviderId(req.provider_id);
1375
1376 let (flow, method) = self.create_auth_flow(&provider_id)?;
1377 let state = flow
1378 .start_auth(method)
1379 .await
1380 .map_err(|e| Status::internal(format!("auth start failed: {e}")))?;
1381 let progress = flow
1382 .get_initial_progress(&state, method)
1383 .await
1384 .map_err(|e| Status::internal(format!("auth progress failed: {e}")))?;
1385
1386 let flow_id = Uuid::new_v4().to_string();
1387 self.auth_flow_manager
1388 .insert(
1389 flow_id.clone(),
1390 AuthFlowEntry {
1391 flow,
1392 state,
1393 last_updated: Instant::now(),
1394 },
1395 )
1396 .await;
1397
1398 Ok(Response::new(proto::StartAuthResponse {
1399 flow_id,
1400 progress: Some(crate::grpc::conversions::auth_progress_to_proto(progress)),
1401 }))
1402 }
1403
1404 async fn send_auth_input(
1405 &self,
1406 request: Request<proto::SendAuthInputRequest>,
1407 ) -> Result<Response<proto::SendAuthInputResponse>, Status> {
1408 self.auth_flow_manager.cleanup().await;
1409 let req = request.into_inner();
1410 let flow_id = req.flow_id.clone();
1411
1412 let mut entry = self
1413 .auth_flow_manager
1414 .take(&flow_id)
1415 .await
1416 .ok_or_else(|| Status::not_found("Auth flow not found"))?;
1417
1418 let progress = entry
1419 .flow
1420 .handle_input(&mut entry.state, &req.input)
1421 .await
1422 .map_err(|e| Status::internal(format!("auth input failed: {e}")))?;
1423
1424 let done = matches!(
1425 progress,
1426 steer_core::auth::AuthProgress::Complete | steer_core::auth::AuthProgress::Error(_)
1427 );
1428
1429 if !done {
1430 entry.last_updated = Instant::now();
1431 self.auth_flow_manager.insert(flow_id, entry).await;
1432 }
1433
1434 Ok(Response::new(proto::SendAuthInputResponse {
1435 progress: Some(crate::grpc::conversions::auth_progress_to_proto(progress)),
1436 }))
1437 }
1438
1439 async fn get_auth_progress(
1440 &self,
1441 request: Request<proto::GetAuthProgressRequest>,
1442 ) -> Result<Response<proto::GetAuthProgressResponse>, Status> {
1443 self.auth_flow_manager.cleanup().await;
1444 let req = request.into_inner();
1445 let flow_id = req.flow_id.clone();
1446
1447 let mut entry = self
1448 .auth_flow_manager
1449 .take(&flow_id)
1450 .await
1451 .ok_or_else(|| Status::not_found("Auth flow not found"))?;
1452
1453 let progress = entry
1454 .flow
1455 .handle_input(&mut entry.state, "")
1456 .await
1457 .map_err(|e| Status::internal(format!("auth progress failed: {e}")))?;
1458
1459 let done = matches!(
1460 progress,
1461 steer_core::auth::AuthProgress::Complete | steer_core::auth::AuthProgress::Error(_)
1462 );
1463
1464 if !done {
1465 entry.last_updated = Instant::now();
1466 self.auth_flow_manager.insert(flow_id, entry).await;
1467 }
1468
1469 Ok(Response::new(proto::GetAuthProgressResponse {
1470 progress: Some(crate::grpc::conversions::auth_progress_to_proto(progress)),
1471 }))
1472 }
1473
1474 async fn cancel_auth(
1475 &self,
1476 request: Request<proto::CancelAuthRequest>,
1477 ) -> Result<Response<proto::CancelAuthResponse>, Status> {
1478 self.auth_flow_manager.cleanup().await;
1479 let req = request.into_inner();
1480 let flow_id = req.flow_id;
1481
1482 let _ = self.auth_flow_manager.take(&flow_id).await;
1483
1484 Ok(Response::new(proto::CancelAuthResponse {}))
1485 }
1486
1487 async fn resolve_model(
1488 &self,
1489 request: Request<proto::ResolveModelRequest>,
1490 ) -> Result<Response<proto::ResolveModelResponse>, Status> {
1491 let req = request.into_inner();
1492
1493 match self.model_registry.resolve(&req.input) {
1494 Ok(model_id) => {
1495 let steer_core::config::model::ModelId { provider, id } = model_id;
1496 let model_spec = proto::ModelSpec {
1497 provider_id: provider.storage_key(),
1498 model_id: id,
1499 };
1500 Ok(Response::new(proto::ResolveModelResponse {
1501 model: Some(model_spec),
1502 }))
1503 }
1504 Err(e) => Err(Status::not_found(format!(
1505 "Failed to resolve model '{}': {}",
1506 req.input, e
1507 ))),
1508 }
1509 }
1510
1511 async fn get_default_model(
1512 &self,
1513 _request: Request<proto::GetDefaultModelRequest>,
1514 ) -> Result<Response<proto::GetDefaultModelResponse>, Status> {
1515 let model = self.select_default_model();
1516 Ok(Response::new(proto::GetDefaultModelResponse {
1517 model: Some(model_to_proto(model)),
1518 }))
1519 }
1520
1521 async fn create_workspace(
1522 &self,
1523 request: Request<proto::CreateWorkspaceRequest>,
1524 ) -> Result<Response<proto::CreateWorkspaceResponse>, Status> {
1525 let req = request.into_inner();
1526 let repo_id = Self::parse_repo_id(&req.repo_id)?;
1527 let parent_workspace_id = match req.parent_workspace_id {
1528 Some(value) => Some(Self::parse_workspace_id(&value)?),
1529 None => None,
1530 };
1531
1532 let strategy = match proto::WorkspaceCreateStrategy::try_from(req.strategy) {
1533 Ok(proto::WorkspaceCreateStrategy::JjWorkspace) => {
1534 steer_workspace::WorkspaceCreateStrategy::JjWorkspace
1535 }
1536 Ok(proto::WorkspaceCreateStrategy::GitWorktree) => {
1537 steer_workspace::WorkspaceCreateStrategy::GitWorktree
1538 }
1539 _ => {
1540 return Err(Status::invalid_argument(
1541 "Unsupported workspace create strategy",
1542 ));
1543 }
1544 };
1545
1546 let request = steer_workspace::CreateWorkspaceRequest {
1547 repo_id,
1548 name: req.name,
1549 parent_workspace_id,
1550 strategy,
1551 };
1552
1553 let workspace = self
1554 .workspace_manager
1555 .create_workspace(request)
1556 .await
1557 .map_err(Self::workspace_manager_error_to_status)?;
1558
1559 Ok(Response::new(proto::CreateWorkspaceResponse {
1560 workspace: Some(workspace_info_to_proto(&workspace)),
1561 }))
1562 }
1563
1564 async fn resolve_repo(
1565 &self,
1566 request: Request<proto::ResolveRepoRequest>,
1567 ) -> Result<Response<proto::ResolveRepoResponse>, Status> {
1568 let req = request.into_inner();
1569 let environment_id = Self::parse_environment_id(&req.environment_id)?;
1570 let repo = self
1571 .repo_manager
1572 .resolve_repo(environment_id, std::path::Path::new(&req.path))
1573 .await
1574 .map_err(Self::workspace_manager_error_to_status)?;
1575
1576 Ok(Response::new(proto::ResolveRepoResponse {
1577 repo: Some(repo_info_to_proto(&repo)),
1578 }))
1579 }
1580
1581 async fn list_repos(
1582 &self,
1583 request: Request<proto::ListReposRequest>,
1584 ) -> Result<Response<proto::ListReposResponse>, Status> {
1585 let req = request.into_inner();
1586 let environment_id = Self::parse_environment_id(&req.environment_id)?;
1587 let repos = self
1588 .repo_manager
1589 .list_repos(environment_id)
1590 .await
1591 .map_err(Self::workspace_manager_error_to_status)?;
1592
1593 Ok(Response::new(proto::ListReposResponse {
1594 repos: repos.iter().map(repo_info_to_proto).collect(),
1595 }))
1596 }
1597
1598 async fn list_workspaces(
1599 &self,
1600 request: Request<proto::ListWorkspacesRequest>,
1601 ) -> Result<Response<proto::ListWorkspacesResponse>, Status> {
1602 let req = request.into_inner();
1603 let environment_id = Self::parse_environment_id(&req.environment_id)?;
1604
1605 let workspaces = self
1606 .workspace_manager
1607 .list_workspaces(steer_workspace::ListWorkspacesRequest { environment_id })
1608 .await
1609 .map_err(Self::workspace_manager_error_to_status)?;
1610
1611 Ok(Response::new(proto::ListWorkspacesResponse {
1612 workspaces: workspaces.iter().map(workspace_info_to_proto).collect(),
1613 }))
1614 }
1615
1616 async fn get_workspace_status(
1617 &self,
1618 request: Request<proto::GetWorkspaceStatusRequest>,
1619 ) -> Result<Response<proto::GetWorkspaceStatusResponse>, Status> {
1620 let req = request.into_inner();
1621 let workspace_id = Self::parse_workspace_id(&req.workspace_id)?;
1622
1623 let status = self
1624 .workspace_manager
1625 .get_workspace_status(workspace_id)
1626 .await
1627 .map_err(Self::workspace_manager_error_to_status)?;
1628
1629 Ok(Response::new(proto::GetWorkspaceStatusResponse {
1630 status: Some(workspace_status_to_proto(&status)),
1631 }))
1632 }
1633
1634 async fn delete_workspace(
1635 &self,
1636 request: Request<proto::DeleteWorkspaceRequest>,
1637 ) -> Result<Response<proto::DeleteWorkspaceResponse>, Status> {
1638 let req = request.into_inner();
1639 let workspace_id = Self::parse_workspace_id(&req.workspace_id)?;
1640
1641 self.workspace_manager
1642 .delete_workspace(steer_workspace::DeleteWorkspaceRequest { workspace_id })
1643 .await
1644 .map_err(Self::workspace_manager_error_to_status)?;
1645
1646 Ok(Response::new(proto::DeleteWorkspaceResponse {}))
1647 }
1648
1649 async fn create_environment(
1650 &self,
1651 request: Request<proto::CreateEnvironmentRequest>,
1652 ) -> Result<Response<proto::CreateEnvironmentResponse>, Status> {
1653 let req = request.into_inner();
1654 let request = steer_workspace::CreateEnvironmentRequest {
1655 root: req.root_path.map(std::path::PathBuf::from),
1656 name: req.name,
1657 };
1658
1659 let env = self
1660 .environment_manager
1661 .create_environment(request)
1662 .await
1663 .map_err(Self::environment_manager_error_to_status)?;
1664
1665 Ok(Response::new(proto::CreateEnvironmentResponse {
1666 environment: Some(environment_descriptor_to_proto(&env)),
1667 }))
1668 }
1669
1670 async fn get_environment(
1671 &self,
1672 request: Request<proto::GetEnvironmentRequest>,
1673 ) -> Result<Response<proto::GetEnvironmentResponse>, Status> {
1674 let req = request.into_inner();
1675 let environment_id = Self::parse_environment_id(&req.environment_id)?;
1676
1677 let env = self
1678 .environment_manager
1679 .get_environment(environment_id)
1680 .await
1681 .map_err(Self::environment_manager_error_to_status)?;
1682
1683 Ok(Response::new(proto::GetEnvironmentResponse {
1684 environment: Some(environment_descriptor_to_proto(&env)),
1685 }))
1686 }
1687
1688 async fn delete_environment(
1689 &self,
1690 request: Request<proto::DeleteEnvironmentRequest>,
1691 ) -> Result<Response<proto::DeleteEnvironmentResponse>, Status> {
1692 let req = request.into_inner();
1693 let environment_id = Self::parse_environment_id(&req.environment_id)?;
1694 let policy = match proto::EnvironmentDeletePolicy::try_from(req.policy) {
1695 Ok(proto::EnvironmentDeletePolicy::Soft) => {
1696 steer_workspace::EnvironmentDeletePolicy::Soft
1697 }
1698 Ok(proto::EnvironmentDeletePolicy::Hard) => {
1699 steer_workspace::EnvironmentDeletePolicy::Hard
1700 }
1701 _ => steer_workspace::EnvironmentDeletePolicy::Hard,
1702 };
1703
1704 self.environment_manager
1705 .delete_environment(environment_id, policy)
1706 .await
1707 .map_err(Self::environment_manager_error_to_status)?;
1708
1709 Ok(Response::new(proto::DeleteEnvironmentResponse {}))
1710 }
1711}