small_bin/
sftp.rs

1use crate::{
2    config::AppConfig,
3    database::{Database, History, QueueItem},
4    notification::notification,
5    utils, *,
6};
7use anyhow::{Context, Result};
8use chrono::Utc;
9use ssh2::Session;
10use std::{fs::File, io::BufReader, net::TcpStream, path::Path, sync::Arc, time::Duration};
11use tokio::{sync::mpsc, time};
12
13#[derive(Debug)]
14pub struct SftpManager {
15    config: Arc<AppConfig>,
16    database: Arc<Database>,
17}
18
19
20impl SftpManager {
21    pub fn new(
22        config: Arc<AppConfig>,
23        database: Arc<Database>,
24    ) -> (Self, mpsc::UnboundedReceiver<()>) {
25        let (_tx, rx) = mpsc::unbounded_channel();
26        (
27            SftpManager {
28                config,
29                database,
30            },
31            rx,
32        )
33    }
34
35
36    pub async fn start(self: Arc<Self>) {
37        let mut interval =
38            time::interval(Duration::from_millis(self.config.fs_check_interval));
39
40        info!(
41            "Starting SFTP queue processor with check interval: {}ms",
42            self.config.fs_check_interval
43        );
44
45        loop {
46            interval.tick().await;
47            if let Err(e) = self.process_queue().await {
48                error!("Error processing queue: {e:?}");
49            }
50        }
51    }
52
53
54    async fn process_queue(&self) -> Result<()> {
55        let queue = self.database.get_queue()?;
56
57        if queue.is_empty() {
58            return Ok(());
59        }
60
61        // Build clipboard content
62        self.build_clipboard(&queue)?;
63
64        // Process each queue item
65        for item in queue {
66            if let Err(e) = self.process_element(&item).await {
67                error!("Error processing queue element: {e:?}");
68            }
69        }
70
71        Ok(())
72    }
73
74
75    fn build_clipboard(&self, queue: &[QueueItem]) -> Result<()> {
76        let links: Vec<String> = queue
77            .iter()
78            .map(|item| {
79                let config = self
80                    .config
81                    .select_config()
82                    .expect("One of configs should always be selected!");
83                format!(
84                    "{}{}{}",
85                    config.address,
86                    item.uuid,
87                    file_extension(&item.local_file)
88                )
89            })
90            .collect();
91
92        let content = links.join(", ");
93        put_to_clipboard(&content)?;
94
95        if self.config.notifications.clipboard {
96            let _ = notification(
97                "Link copied to clipboard",
98                "clipboard",
99                &self.config.notifications,
100                &self.config.sounds,
101            );
102        }
103
104        Ok(())
105    }
106
107
108    async fn process_element(&self, item: &QueueItem) -> Result<()> {
109        let path = Path::new(&item.local_file);
110
111        // Check if file exists and is regular
112        if !path.exists() || !path.is_file() {
113            debug!(
114                "Local file not found or not a regular file: {}. Skipping.",
115                item.local_file
116            );
117            self.database.remove_from_queue(&item.uuid)?;
118            return Ok(());
119        }
120
121        // Check for temp file pattern
122        let temp_pattern = regex::Regex::new(r".*-[a-zA-Z]{4,}$")?;
123        if temp_pattern.is_match(&item.local_file) {
124            debug!("File matches temp pattern, skipping: {}", item.local_file);
125            self.database.remove_from_queue(&item.uuid)?;
126            return Ok(());
127        }
128
129        // Upload file
130        let remote_file = format!("{}{}", item.remote_file, file_extension(&item.local_file));
131        self.send_file(&item.local_file, &remote_file).await?;
132
133        // Add to history
134        self.add_to_history(item)?;
135
136        // Remove from queue
137        self.database.remove_from_queue(&item.uuid)?;
138
139        Ok(())
140    }
141
142
143    async fn send_file(&self, local_file: &str, remote_file: &str) -> Result<()> {
144        let config = &self
145            .config
146            .select_config()
147            .expect("One of configs should always be selected!");
148
149        // Connect via SSH
150        let tcp = TcpStream::connect(format!("{}:{}", config.hostname, config.ssh_port))
151            .context("Failed to connect to SSH server")?;
152        tcp.set_read_timeout(Some(Duration::from_millis(
153            self.config.ssh_connection_timeout,
154        )))?;
155        tcp.set_write_timeout(Some(Duration::from_millis(
156            self.config.ssh_connection_timeout,
157        )))?;
158
159        let mut sess = Session::new()?;
160        sess.set_tcp_stream(tcp);
161        sess.handshake()?;
162
163        // Authenticate
164        let ssh_private_key = if config.ssh_key.is_empty() {
165            ".ssh/id_ed25519"
166        } else {
167            &config.ssh_key
168        };
169
170        sess.userauth_pubkey_file(
171            &config.username,
172            None,
173            Path::new(&home::home_dir().expect("Home dir has to be set!"))
174                .join(ssh_private_key)
175                .as_path(),
176            if config.ssh_key_pass.is_empty() {
177                None
178            } else {
179                Some(&config.ssh_key_pass)
180            },
181        )?;
182
183        if !sess.authenticated() {
184            anyhow::bail!("SSH authentication failed");
185        }
186
187        debug!("SSH connection established");
188
189        // Start SFTP session
190        let sftp = sess.sftp()?;
191        debug!("SFTP session started");
192
193        // Check remote file
194        let local_size = local_file_size(local_file)?;
195        let remote_size = sftp
196            .stat(Path::new(remote_file))
197            .map(|stat| stat.size.unwrap_or(0))
198            .unwrap_or(0);
199
200        debug!(
201            "Local file: {local_file} ({local_size}); Remote file: {remote_file} ({remote_size})"
202        );
203
204        if remote_size > 0 && remote_size == local_size {
205            info!("Found file of same size already uploaded. Skipping");
206            return Ok(());
207        }
208
209        // Upload file
210        let mut local = BufReader::new(File::open(local_file)?);
211        let mut remote = sftp.create(Path::new(remote_file))?;
212
213        stream_file_to_remote(
214            &mut local,
215            &mut remote,
216            self.config.sftp_buffer_size,
217            local_size,
218        )?;
219
220        if self.config.notifications.upload {
221            let _ = notification(
222                "Uploaded successfully.",
223                "upload",
224                &self.config.notifications,
225                &self.config.sounds,
226            );
227        }
228        Ok(())
229    }
230
231
232    fn add_to_history(&self, queue_item: &QueueItem) -> Result<()> {
233        let config = self
234            .config
235            .select_config()
236            .expect("One of configs should always be selected!");
237        let content = format!(
238            "{}{}{}",
239            config.address,
240            queue_item.uuid,
241            file_extension(&queue_item.local_file)
242        );
243
244        // Check if already in history
245        let history = self.database.get_history(None)?;
246        let exists = history.iter().any(|h| h.content.contains(&content));
247
248        if !exists {
249            let history_item = History {
250                content,
251                timestamp: Utc::now().timestamp(),
252                file: queue_item.local_file.clone(),
253                uuid: uuid::Uuid::new_v4().to_string(),
254            };
255            self.database.add_history(&history_item)?;
256        }
257
258        Ok(())
259    }
260}