small_bin/
sftp.rs

1use crate::{
2    config::AppConfig,
3    database::{Database, History, QueueItem},
4    notification::notification,
5    utils, *,
6};
7use anyhow::{Context, Result};
8use ssh2::Session;
9use std::{fs::File, io::BufReader, net::TcpStream, path::Path, sync::Arc, time::Duration};
10use tokio::{sync::mpsc, time};
11
12#[derive(Debug)]
13pub struct SftpManager {
14    config: Arc<AppConfig>,
15    database: Arc<Database>,
16}
17
18
19impl SftpManager {
20    pub fn new(
21        config: Arc<AppConfig>,
22        database: Arc<Database>,
23    ) -> (Self, mpsc::UnboundedReceiver<()>) {
24        let (_tx, rx) = mpsc::unbounded_channel();
25        (
26            SftpManager {
27                config,
28                database,
29            },
30            rx,
31        )
32    }
33
34
35    pub async fn start(self: Arc<Self>) {
36        let mut interval =
37            time::interval(Duration::from_millis(self.config.fs_check_interval));
38
39        info!(
40            "Starting SFTP queue processor with check interval: {}ms",
41            self.config.fs_check_interval
42        );
43
44        loop {
45            interval.tick().await;
46            if let Err(e) = self.process_queue().await {
47                error!("Error processing queue: {e:?}");
48            }
49        }
50    }
51
52
53    async fn process_queue(&self) -> Result<()> {
54        let queue = self.database.get_queue()?;
55        if !queue.is_empty() {
56            // Build clipboard content
57            self.build_clipboard(&queue)?;
58
59            // Process each queue item
60            for item in &queue {
61                if let Err(e) = self.process_element(item).await {
62                    error!("Error processing queue element: {e:?}");
63                }
64            }
65        }
66        Ok(())
67    }
68
69
70    fn build_clipboard(&self, queue: &[QueueItem]) -> Result<()> {
71        let links: Vec<String> = queue
72            .iter()
73            .map(|item| {
74                let config = self
75                    .config
76                    .select_config()
77                    .expect("One of configs should always be selected!");
78                format!(
79                    "{}{}{}",
80                    config.address,
81                    item.uuid,
82                    file_extension(&item.local_file)
83                )
84            })
85            .collect();
86
87        let content = links.join(", ");
88        put_to_clipboard(&content)?;
89
90        if self.config.notifications.clipboard {
91            let _ = notification(
92                "Link copied to clipboard",
93                "clipboard",
94                &self.config.notifications,
95                &self.config.sounds,
96            );
97        }
98
99        Ok(())
100    }
101
102
103    async fn process_element(&self, item: &QueueItem) -> Result<()> {
104        let path = Path::new(&item.local_file);
105
106        // Check if file exists and is regular
107        if !path.exists() || !path.is_file() {
108            debug!(
109                "Local file not found or not a regular file: {}. Skipping.",
110                item.local_file
111            );
112            self.database.remove_from_queue(&item.uuid)?;
113            return Ok(());
114        }
115
116        // Check for temp file pattern
117        if TEMP_PATTERN.is_match(&item.local_file) {
118            debug!("File matches temp pattern, skipping: {}", item.local_file);
119            self.database.remove_from_queue(&item.uuid)?;
120            return Ok(());
121        }
122
123        // Upload file
124        let remote_file = format!("{}{}", item.remote_file, file_extension(&item.local_file));
125        self.send_file(&item.local_file, &remote_file).await?;
126
127        // Add to history
128        self.add_to_history(item)?;
129
130        // Remove from queue
131        self.database.remove_from_queue(&item.uuid)?;
132
133        Ok(())
134    }
135
136
137    async fn send_file(&self, local_file: &str, remote_file: &str) -> Result<()> {
138        let config = &self
139            .config
140            .select_config()
141            .expect("One of configs should always be selected!");
142
143        // Connect via SSH
144        let tcp = TcpStream::connect(format!("{}:{}", config.hostname, config.ssh_port))
145            .context("Failed to connect to SSH server")?;
146        tcp.set_read_timeout(Some(Duration::from_millis(
147            self.config.ssh_connection_timeout,
148        )))?;
149        tcp.set_write_timeout(Some(Duration::from_millis(
150            self.config.ssh_connection_timeout,
151        )))?;
152
153        let mut sess = Session::new()?;
154        sess.set_tcp_stream(tcp);
155        sess.handshake()?;
156
157        // Authenticate
158        let ssh_private_key = if config.ssh_key.is_empty() {
159            ".ssh/id_ed25519"
160        } else {
161            &config.ssh_key
162        };
163
164        sess.userauth_pubkey_file(
165            &config.username,
166            None,
167            Path::new(&home::home_dir().expect("Home dir has to be set!"))
168                .join(ssh_private_key)
169                .as_path(),
170            if config.ssh_key_pass.is_empty() {
171                None
172            } else {
173                Some(&config.ssh_key_pass)
174            },
175        )?;
176
177        if !sess.authenticated() {
178            anyhow::bail!("SSH authentication failed");
179        }
180
181        debug!("SSH connection established");
182
183        // Start SFTP session
184        let sftp = sess.sftp()?;
185        debug!("SFTP session started");
186
187        // Check remote file
188        let local_size = local_file_size(local_file)?;
189        let remote_size = sftp
190            .stat(Path::new(remote_file))
191            .map(|stat| stat.size.unwrap_or(0))
192            .unwrap_or(0);
193
194        debug!(
195            "Local file: {local_file} ({local_size}); Remote file: {remote_file} ({remote_size})"
196        );
197
198        if remote_size > 0 && remote_size == local_size {
199            info!("Found file of same size already uploaded. Skipping");
200            return Ok(());
201        }
202
203        // Upload file
204        let mut local = BufReader::new(File::open(local_file)?);
205        let mut remote = sftp.create(Path::new(remote_file))?;
206
207        stream_file_to_remote(
208            &mut local,
209            &mut remote,
210            self.config.sftp_buffer_size,
211            local_size,
212        )?;
213
214        if self.config.notifications.upload {
215            let _ = notification(
216                "Uploaded successfully.",
217                "upload",
218                &self.config.notifications,
219                &self.config.sounds,
220            );
221        }
222        Ok(())
223    }
224
225
226    fn add_to_history(&self, queue_item: &QueueItem) -> Result<()> {
227        let config = self
228            .config
229            .select_config()
230            .expect("One of configs should always be selected!");
231        let content = format!(
232            "{}{}{}",
233            config.address,
234            queue_item.uuid,
235            file_extension(&queue_item.local_file)
236        );
237
238        // Check if already in history
239        let history = self.database.get_history(None)?;
240        let exists = history.iter().any(|h| h.content.contains(&content));
241
242        if !exists {
243            let history_item = History {
244                content,
245                timestamp: chrono::Local::now().timestamp(),
246                file: queue_item.local_file.clone(),
247                uuid: uuid::Uuid::new_v4().to_string(),
248            };
249            self.database.add_history(&history_item)?;
250        }
251
252        Ok(())
253    }
254}