swiftide_docker_executor/
file_loader.rs

1use std::{borrow::Cow, path::PathBuf};
2
3use codegen::{LoadFilesRequest, NodeResponse, loader_client::LoaderClient};
4use swiftide_core::{Loader, indexing::TextNode};
5use tokio::runtime::Handle;
6
7use crate::RunningDockerExecutor;
8
9pub mod codegen {
10    tonic::include_proto!("loader");
11}
12
13#[derive(Debug, Clone)]
14pub struct FileLoader<'a> {
15    path: PathBuf,
16    extensions: Vec<String>,
17    executor: Cow<'a, RunningDockerExecutor>,
18}
19
20impl RunningDockerExecutor {
21    /// Creates an owned file loader from the executor. If needed it is safe to clone the executor.
22    ///
23    /// The loader can be used with a swiftide indexing pipeline.
24    pub fn into_file_loader<V: IntoIterator<Item = T>, T: Into<String>>(
25        self,
26        path: impl Into<PathBuf>,
27        extensions: V,
28    ) -> FileLoader<'static> {
29        let path = path.into();
30        let extensions = extensions.into_iter().map(Into::into).collect::<Vec<_>>();
31        FileLoader {
32            path,
33            extensions,
34            executor: Cow::Owned(self),
35        }
36    }
37
38    /// Creates a borrowed file loader from the executor.
39    pub fn as_file_loader<'a, V: IntoIterator<Item = T>, T: Into<String>>(
40        &'a self,
41        path: impl Into<PathBuf>,
42        extensions: V,
43    ) -> FileLoader<'a> {
44        let path = path.into();
45        let extensions = extensions.into_iter().map(Into::into).collect::<Vec<_>>();
46        FileLoader {
47            path,
48            extensions,
49            executor: Cow::Borrowed(self),
50        }
51    }
52}
53
54impl Loader for FileLoader<'_> {
55    type Output = String;
56    fn into_stream(self) -> swiftide_core::indexing::IndexingStream<String> {
57        let container_ip = &self.executor.container_ip;
58        let container_port = &self.executor.container_port;
59        let mut client = tokio::task::block_in_place(|| {
60            Handle::current().block_on(async {
61                LoaderClient::connect(format!("http://{container_ip}:{container_port}")).await
62            })
63        })
64        .expect("Failed to connect to service");
65
66        let (tx, rx) = tokio::sync::mpsc::channel::<anyhow::Result<TextNode>>(1000);
67
68        tokio::task::spawn(async move {
69            let stream = match client
70                .load_files(LoadFilesRequest {
71                    root_path: self.path.to_string_lossy().to_string(),
72                    file_extensions: self.extensions,
73                })
74                .await
75            {
76                Ok(stream) => stream,
77                Err(error) => {
78                    tracing::error!(error = ?error, "Failed to load files");
79                    return;
80                }
81            };
82
83            let mut stream = stream.into_inner();
84
85            while let Some(result) = stream.message().await.transpose() {
86                if let Err(e) = tx
87                    .send(
88                        result
89                            .map_err(anyhow::Error::from)
90                            .and_then(TryInto::try_into),
91                    )
92                    .await
93                {
94                    tracing::error!(error = ?e, "error sending node");
95                    break;
96                }
97            }
98        });
99
100        rx.into()
101    }
102}
103
104impl TryInto<TextNode> for NodeResponse {
105    type Error = anyhow::Error;
106
107    fn try_into(self) -> Result<TextNode, Self::Error> {
108        TextNode::builder()
109            .path(self.path)
110            .chunk(self.chunk)
111            .original_size(self.original_size as usize)
112            .build()
113    }
114}