1use std::{
2 borrow::Cow,
3 path::{Path, PathBuf},
4 sync::{Arc, Mutex},
5 time::Duration,
6};
7
8use crate::{
9 chat_completion::{ChatMessage, ToolCall},
10 indexing::IndexingStream,
11};
12use anyhow::Result;
13use async_trait::async_trait;
14use dyn_clone::DynClone;
15use serde::{Deserialize, Serialize};
16use thiserror::Error;
17
18#[async_trait]
30pub trait ToolExecutor: Send + Sync + DynClone {
31 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError>;
33
34 async fn stream_files(
36 &self,
37 path: &Path,
38 extensions: Option<Vec<String>>,
39 ) -> Result<IndexingStream<String>>;
40}
41
42dyn_clone::clone_trait_object!(ToolExecutor);
43
44#[derive(Debug, Clone)]
49pub struct ScopedExecutor<E> {
50 executor: E,
51 scope: PathBuf,
52}
53
54impl<E> ScopedExecutor<E> {
55 pub fn new(executor: E, scope: impl Into<PathBuf>) -> Self {
57 Self {
58 executor,
59 scope: scope.into(),
60 }
61 }
62
63 fn apply_scope<'a>(&'a self, cmd: &'a Command) -> Cow<'a, Command> {
65 match cmd.current_dir_path() {
66 Some(path) if path.is_absolute() || self.scope.as_os_str().is_empty() => {
67 Cow::Borrowed(cmd)
68 }
69 Some(path) => {
70 let mut scoped = cmd.clone();
71 scoped.current_dir(self.scope.join(path));
72 Cow::Owned(scoped)
73 }
74 None if self.scope.as_os_str().is_empty() => Cow::Borrowed(cmd),
75 None => {
76 let mut scoped = cmd.clone();
77 scoped.current_dir(self.scope.clone());
78 Cow::Owned(scoped)
79 }
80 }
81 }
82
83 fn scoped_path<'a>(&'a self, path: &'a Path) -> Cow<'a, Path> {
85 if path.is_absolute() || self.scope.as_os_str().is_empty() {
86 Cow::Borrowed(path)
87 } else {
88 Cow::Owned(self.scope.join(path))
89 }
90 }
91
92 pub fn inner(&self) -> &E {
94 &self.executor
95 }
96
97 pub fn scope(&self) -> &Path {
99 &self.scope
100 }
101}
102
103#[async_trait]
104impl<'a, E> ToolExecutor for ScopedExecutor<&'a E>
105where
106 E: ToolExecutor + Send + Sync + 'a,
107{
108 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
109 let scoped_cmd = self.apply_scope(cmd);
110 self.executor.exec_cmd(scoped_cmd.as_ref()).await
111 }
112
113 async fn stream_files(
114 &self,
115 path: &Path,
116 extensions: Option<Vec<String>>,
117 ) -> Result<IndexingStream<String>> {
118 let scoped_path = self.scoped_path(path);
119 self.executor
120 .stream_files(scoped_path.as_ref(), extensions)
121 .await
122 }
123}
124
125pub trait ExecutorExt {
127 fn scoped(&self, path: impl Into<PathBuf>) -> ScopedExecutor<&Self>;
129}
130
131impl<T> ExecutorExt for T
132where
133 T: ToolExecutor + ?Sized,
134{
135 fn scoped(&self, path: impl Into<PathBuf>) -> ScopedExecutor<&Self> {
136 ScopedExecutor::new(self, path)
137 }
138}
139
140#[async_trait]
141impl<T> ToolExecutor for &T
142where
143 T: ToolExecutor + ?Sized,
144{
145 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
146 (**self).exec_cmd(cmd).await
147 }
148
149 async fn stream_files(
150 &self,
151 path: &Path,
152 extensions: Option<Vec<String>>,
153 ) -> Result<IndexingStream<String>> {
154 (**self).stream_files(path, extensions).await
155 }
156}
157
158#[async_trait]
159impl ToolExecutor for Arc<dyn ToolExecutor> {
160 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
161 self.as_ref().exec_cmd(cmd).await
162 }
163
164 async fn stream_files(
165 &self,
166 path: &Path,
167 extensions: Option<Vec<String>>,
168 ) -> Result<IndexingStream<String>> {
169 self.as_ref().stream_files(path, extensions).await
170 }
171}
172
173#[async_trait]
174impl ToolExecutor for Box<dyn ToolExecutor> {
175 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
176 self.as_ref().exec_cmd(cmd).await
177 }
178
179 async fn stream_files(
180 &self,
181 path: &Path,
182 extensions: Option<Vec<String>>,
183 ) -> Result<IndexingStream<String>> {
184 self.as_ref().stream_files(path, extensions).await
185 }
186}
187
188#[derive(Debug, Error)]
189pub enum CommandError {
190 #[error("executor error: {0:#}")]
192 ExecutorError(#[from] anyhow::Error),
193
194 #[error("command timed out after {timeout:?}: {output}")]
196 TimedOut {
197 timeout: Duration,
198 output: CommandOutput,
199 },
200
201 #[error("command failed with NonZeroExit: {0}")]
203 NonZeroExit(CommandOutput),
204}
205
206impl From<std::io::Error> for CommandError {
207 fn from(err: std::io::Error) -> Self {
208 CommandError::NonZeroExit(err.to_string().into())
209 }
210}
211
212#[derive(Debug, Clone)]
223#[non_exhaustive]
224pub enum Command {
225 Shell {
226 command: String,
227 current_dir: Option<PathBuf>,
228 timeout: Option<Duration>,
229 },
230 ReadFile {
231 path: PathBuf,
232 current_dir: Option<PathBuf>,
233 timeout: Option<Duration>,
234 },
235 WriteFile {
236 path: PathBuf,
237 content: String,
238 current_dir: Option<PathBuf>,
239 timeout: Option<Duration>,
240 },
241}
242
243impl Command {
244 pub fn shell<S: Into<String>>(cmd: S) -> Self {
245 Command::Shell {
246 command: cmd.into(),
247 current_dir: None,
248 timeout: None,
249 }
250 }
251
252 pub fn read_file<P: Into<PathBuf>>(path: P) -> Self {
253 Command::ReadFile {
254 path: path.into(),
255 current_dir: None,
256 timeout: None,
257 }
258 }
259
260 pub fn write_file<P: Into<PathBuf>, S: Into<String>>(path: P, content: S) -> Self {
261 Command::WriteFile {
262 path: path.into(),
263 content: content.into(),
264 current_dir: None,
265 timeout: None,
266 }
267 }
268
269 #[must_use]
274 pub fn with_current_dir<P: Into<PathBuf>>(mut self, path: P) -> Self {
275 self.current_dir(path);
276 self
277 }
278
279 pub fn current_dir<P: Into<PathBuf>>(&mut self, path: P) -> &mut Self {
282 let dir = Some(path.into());
283 match self {
284 Command::Shell { current_dir, .. }
285 | Command::ReadFile { current_dir, .. }
286 | Command::WriteFile { current_dir, .. } => {
287 *current_dir = dir;
288 }
289 }
290 self
291 }
292
293 pub fn clear_current_dir(&mut self) -> &mut Self {
294 match self {
295 Command::Shell { current_dir, .. }
296 | Command::ReadFile { current_dir, .. }
297 | Command::WriteFile { current_dir, .. } => {
298 *current_dir = None;
299 }
300 }
301 self
302 }
303
304 pub fn current_dir_path(&self) -> Option<&Path> {
305 match self {
306 Command::Shell { current_dir, .. }
307 | Command::ReadFile { current_dir, .. }
308 | Command::WriteFile { current_dir, .. } => current_dir.as_deref(),
309 }
310 }
311
312 #[must_use]
314 pub fn with_timeout(mut self, timeout: Duration) -> Self {
315 self.timeout(timeout);
316 self
317 }
318
319 pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
321 match self {
322 Command::Shell { timeout: slot, .. }
323 | Command::ReadFile { timeout: slot, .. }
324 | Command::WriteFile { timeout: slot, .. } => {
325 *slot = Some(timeout);
326 }
327 }
328 self
329 }
330
331 pub fn clear_timeout(&mut self) -> &mut Self {
333 match self {
334 Command::Shell { timeout, .. }
335 | Command::ReadFile { timeout, .. }
336 | Command::WriteFile { timeout, .. } => {
337 *timeout = None;
338 }
339 }
340 self
341 }
342
343 pub fn timeout_duration(&self) -> Option<&Duration> {
345 match self {
346 Command::Shell { timeout, .. }
347 | Command::ReadFile { timeout, .. }
348 | Command::WriteFile { timeout, .. } => timeout.as_ref(),
349 }
350 }
351}
352
353#[derive(Debug, Clone)]
355pub struct CommandOutput {
356 pub output: String,
357 }
360
361impl CommandOutput {
362 pub fn empty() -> Self {
363 CommandOutput {
364 output: String::new(),
365 }
366 }
367
368 pub fn new(output: impl Into<String>) -> Self {
369 CommandOutput {
370 output: output.into(),
371 }
372 }
373 pub fn is_empty(&self) -> bool {
374 self.output.is_empty()
375 }
376}
377
378impl std::fmt::Display for CommandOutput {
379 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380 self.output.fmt(f)
381 }
382}
383
384impl<T: Into<String>> From<T> for CommandOutput {
385 fn from(value: T) -> Self {
386 CommandOutput {
387 output: value.into(),
388 }
389 }
390}
391
392impl AsRef<str> for CommandOutput {
393 fn as_ref(&self) -> &str {
394 &self.output
395 }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize, strum_macros::EnumIs)]
400#[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))]
401pub enum ToolFeedback {
402 Approved { payload: Option<serde_json::Value> },
403 Refused { payload: Option<serde_json::Value> },
404}
405
406impl ToolFeedback {
407 pub fn approved() -> Self {
408 ToolFeedback::Approved { payload: None }
409 }
410
411 pub fn refused() -> Self {
412 ToolFeedback::Refused { payload: None }
413 }
414
415 pub fn payload(&self) -> Option<&serde_json::Value> {
416 match self {
417 ToolFeedback::Refused { payload } | ToolFeedback::Approved { payload } => {
418 payload.as_ref()
419 }
420 }
421 }
422
423 #[must_use]
424 pub fn with_payload(self, payload: serde_json::Value) -> Self {
425 match self {
426 ToolFeedback::Approved { .. } => ToolFeedback::Approved {
427 payload: Some(payload),
428 },
429 ToolFeedback::Refused { .. } => ToolFeedback::Refused {
430 payload: Some(payload),
431 },
432 }
433 }
434}
435
436#[async_trait]
438pub trait AgentContext: Send + Sync {
439 async fn next_completion(&self) -> Result<Option<Vec<ChatMessage>>>;
449
450 async fn current_new_messages(&self) -> Result<Vec<ChatMessage>>;
452
453 async fn add_messages(&self, item: Vec<ChatMessage>) -> Result<()>;
455
456 async fn add_message(&self, item: ChatMessage) -> Result<()>;
458
459 #[deprecated(note = "use executor instead")]
463 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError>;
464
465 fn executor(&self) -> &Arc<dyn ToolExecutor>;
466
467 async fn history(&self) -> Result<Vec<ChatMessage>>;
468
469 async fn redrive(&self) -> Result<()>;
474
475 async fn has_received_feedback(&self, tool_call: &ToolCall) -> Option<ToolFeedback>;
478
479 async fn feedback_received(&self, tool_call: &ToolCall, feedback: &ToolFeedback) -> Result<()>;
480}
481
482#[async_trait]
483impl AgentContext for Box<dyn AgentContext> {
484 async fn next_completion(&self) -> Result<Option<Vec<ChatMessage>>> {
485 (**self).next_completion().await
486 }
487
488 async fn current_new_messages(&self) -> Result<Vec<ChatMessage>> {
489 (**self).current_new_messages().await
490 }
491
492 async fn add_messages(&self, item: Vec<ChatMessage>) -> Result<()> {
493 (**self).add_messages(item).await
494 }
495
496 async fn add_message(&self, item: ChatMessage) -> Result<()> {
497 (**self).add_message(item).await
498 }
499
500 #[allow(deprecated)]
501 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
502 (**self).exec_cmd(cmd).await
503 }
504
505 fn executor(&self) -> &Arc<dyn ToolExecutor> {
506 (**self).executor()
507 }
508
509 async fn history(&self) -> Result<Vec<ChatMessage>> {
510 (**self).history().await
511 }
512
513 async fn redrive(&self) -> Result<()> {
514 (**self).redrive().await
515 }
516
517 async fn has_received_feedback(&self, tool_call: &ToolCall) -> Option<ToolFeedback> {
518 (**self).has_received_feedback(tool_call).await
519 }
520
521 async fn feedback_received(&self, tool_call: &ToolCall, feedback: &ToolFeedback) -> Result<()> {
522 (**self).feedback_received(tool_call, feedback).await
523 }
524}
525
526#[async_trait]
527impl AgentContext for Arc<dyn AgentContext> {
528 async fn next_completion(&self) -> Result<Option<Vec<ChatMessage>>> {
529 (**self).next_completion().await
530 }
531
532 async fn current_new_messages(&self) -> Result<Vec<ChatMessage>> {
533 (**self).current_new_messages().await
534 }
535
536 async fn add_messages(&self, item: Vec<ChatMessage>) -> Result<()> {
537 (**self).add_messages(item).await
538 }
539
540 async fn add_message(&self, item: ChatMessage) -> Result<()> {
541 (**self).add_message(item).await
542 }
543
544 #[allow(deprecated)]
545 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
546 (**self).exec_cmd(cmd).await
547 }
548
549 fn executor(&self) -> &Arc<dyn ToolExecutor> {
550 (**self).executor()
551 }
552
553 async fn history(&self) -> Result<Vec<ChatMessage>> {
554 (**self).history().await
555 }
556
557 async fn redrive(&self) -> Result<()> {
558 (**self).redrive().await
559 }
560
561 async fn has_received_feedback(&self, tool_call: &ToolCall) -> Option<ToolFeedback> {
562 (**self).has_received_feedback(tool_call).await
563 }
564
565 async fn feedback_received(&self, tool_call: &ToolCall, feedback: &ToolFeedback) -> Result<()> {
566 (**self).feedback_received(tool_call, feedback).await
567 }
568}
569
570#[async_trait]
571impl AgentContext for &dyn AgentContext {
572 async fn next_completion(&self) -> Result<Option<Vec<ChatMessage>>> {
573 (**self).next_completion().await
574 }
575
576 async fn current_new_messages(&self) -> Result<Vec<ChatMessage>> {
577 (**self).current_new_messages().await
578 }
579
580 async fn add_messages(&self, item: Vec<ChatMessage>) -> Result<()> {
581 (**self).add_messages(item).await
582 }
583
584 async fn add_message(&self, item: ChatMessage) -> Result<()> {
585 (**self).add_message(item).await
586 }
587
588 #[allow(deprecated)]
589 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
590 (**self).exec_cmd(cmd).await
591 }
592
593 fn executor(&self) -> &Arc<dyn ToolExecutor> {
594 (**self).executor()
595 }
596
597 async fn history(&self) -> Result<Vec<ChatMessage>> {
598 (**self).history().await
599 }
600
601 async fn redrive(&self) -> Result<()> {
602 (**self).redrive().await
603 }
604
605 async fn has_received_feedback(&self, tool_call: &ToolCall) -> Option<ToolFeedback> {
606 (**self).has_received_feedback(tool_call).await
607 }
608
609 async fn feedback_received(&self, tool_call: &ToolCall, feedback: &ToolFeedback) -> Result<()> {
610 (**self).feedback_received(tool_call, feedback).await
611 }
612}
613
614#[async_trait]
618impl AgentContext for () {
619 async fn next_completion(&self) -> Result<Option<Vec<ChatMessage>>> {
620 Ok(None)
621 }
622
623 async fn current_new_messages(&self) -> Result<Vec<ChatMessage>> {
624 Ok(Vec::new())
625 }
626
627 async fn add_messages(&self, _item: Vec<ChatMessage>) -> Result<()> {
628 Ok(())
629 }
630
631 async fn add_message(&self, _item: ChatMessage) -> Result<()> {
632 Ok(())
633 }
634
635 async fn exec_cmd(&self, _cmd: &Command) -> Result<CommandOutput, CommandError> {
636 Err(CommandError::ExecutorError(anyhow::anyhow!(
637 "Empty agent context does not have a tool executor"
638 )))
639 }
640
641 fn executor(&self) -> &Arc<dyn ToolExecutor> {
642 unimplemented!("Empty agent context does not have a tool executor")
643 }
644
645 async fn history(&self) -> Result<Vec<ChatMessage>> {
646 Ok(Vec::new())
647 }
648
649 async fn redrive(&self) -> Result<()> {
650 Ok(())
651 }
652
653 async fn has_received_feedback(&self, _tool_call: &ToolCall) -> Option<ToolFeedback> {
654 Some(ToolFeedback::Approved { payload: None })
655 }
656
657 async fn feedback_received(
658 &self,
659 _tool_call: &ToolCall,
660 _feedback: &ToolFeedback,
661 ) -> Result<()> {
662 Ok(())
663 }
664}
665
666#[async_trait]
671pub trait MessageHistory: Send + Sync + std::fmt::Debug {
672 async fn history(&self) -> Result<Vec<ChatMessage>>;
674
675 async fn push_owned(&self, item: ChatMessage) -> Result<()>;
677
678 async fn overwrite(&self, items: Vec<ChatMessage>) -> Result<()>;
680
681 async fn push(&self, item: &ChatMessage) -> Result<()> {
683 self.push_owned(item.clone()).await
684 }
685
686 async fn extend(&self, items: &[ChatMessage]) -> Result<()> {
688 self.extend_owned(items.to_vec()).await
689 }
690
691 async fn extend_owned(&self, items: Vec<ChatMessage>) -> Result<()> {
693 for item in items {
694 self.push_owned(item).await?;
695 }
696
697 Ok(())
698 }
699}
700
701#[async_trait]
702impl MessageHistory for Mutex<Vec<ChatMessage>> {
703 async fn history(&self) -> Result<Vec<ChatMessage>> {
704 Ok(self.lock().unwrap().clone())
705 }
706
707 async fn push_owned(&self, item: ChatMessage) -> Result<()> {
708 self.lock().unwrap().push(item);
709
710 Ok(())
711 }
712
713 async fn overwrite(&self, items: Vec<ChatMessage>) -> Result<()> {
714 let mut lock = self.lock().unwrap();
715 *lock = items;
716
717 Ok(())
718 }
719}