swiftide_docker_executor/
file_loader.rs1use std::{borrow::Cow, path::PathBuf};
2
3use codegen::{LoadFilesRequest, NodeResponse, loader_client::LoaderClient};
4use swiftide_core::{Loader, indexing::TextNode};
5use tokio::runtime::Handle;
6
7use crate::RunningDockerExecutor;
8
9pub mod codegen {
10 tonic::include_proto!("loader");
11}
12
13#[derive(Debug, Clone)]
14pub struct FileLoader<'a> {
15 path: PathBuf,
16 extensions: Vec<String>,
17 executor: Cow<'a, RunningDockerExecutor>,
18}
19
20impl RunningDockerExecutor {
21 pub fn into_file_loader<V: IntoIterator<Item = T>, T: Into<String>>(
25 self,
26 path: impl Into<PathBuf>,
27 extensions: V,
28 ) -> FileLoader<'static> {
29 let path = path.into();
30 let extensions = extensions.into_iter().map(Into::into).collect::<Vec<_>>();
31 FileLoader {
32 path,
33 extensions,
34 executor: Cow::Owned(self),
35 }
36 }
37
38 pub fn as_file_loader<'a, V: IntoIterator<Item = T>, T: Into<String>>(
40 &'a self,
41 path: impl Into<PathBuf>,
42 extensions: V,
43 ) -> FileLoader<'a> {
44 let path = path.into();
45 let extensions = extensions.into_iter().map(Into::into).collect::<Vec<_>>();
46 FileLoader {
47 path,
48 extensions,
49 executor: Cow::Borrowed(self),
50 }
51 }
52}
53
54impl Loader for FileLoader<'_> {
55 type Output = String;
56 fn into_stream(self) -> swiftide_core::indexing::IndexingStream<String> {
57 let container_ip = &self.executor.container_ip;
58 let container_port = &self.executor.container_port;
59 let mut client = tokio::task::block_in_place(|| {
60 Handle::current().block_on(async {
61 LoaderClient::connect(format!("http://{container_ip}:{container_port}")).await
62 })
63 })
64 .expect("Failed to connect to service");
65
66 let (tx, rx) = tokio::sync::mpsc::channel::<anyhow::Result<TextNode>>(1000);
67
68 tokio::task::spawn(async move {
69 let stream = match client
70 .load_files(LoadFilesRequest {
71 root_path: self.path.to_string_lossy().to_string(),
72 file_extensions: self.extensions,
73 })
74 .await
75 {
76 Ok(stream) => stream,
77 Err(error) => {
78 tracing::error!(error = ?error, "Failed to load files");
79 return;
80 }
81 };
82
83 let mut stream = stream.into_inner();
84
85 while let Some(result) = stream.message().await.transpose() {
86 if let Err(e) = tx
87 .send(
88 result
89 .map_err(anyhow::Error::from)
90 .and_then(TryInto::try_into),
91 )
92 .await
93 {
94 tracing::error!(error = ?e, "error sending node");
95 break;
96 }
97 }
98 });
99
100 rx.into()
101 }
102}
103
104impl TryInto<TextNode> for NodeResponse {
105 type Error = anyhow::Error;
106
107 fn try_into(self) -> Result<TextNode, Self::Error> {
108 TextNode::builder()
109 .path(self.path)
110 .chunk(self.chunk)
111 .original_size(self.original_size as usize)
112 .build()
113 }
114}