small_bin/
sftp.rs

1use crate::{
2    config::AppConfig,
3    database::{Database, History, QueueItem},
4    notification::notification,
5    utils, *,
6};
7use anyhow::{Context, Result};
8use chrono::Utc;
9use ssh2::Session;
10use std::{fs::File, io::BufReader, net::TcpStream, path::Path, sync::Arc, time::Duration};
11use tokio::{sync::mpsc, time};
12
13#[derive(Debug)]
14pub struct SftpManager {
15    config: Arc<AppConfig>,
16    database: Arc<Database>,
17}
18
19
20impl SftpManager {
21    pub fn new(
22        config: Arc<AppConfig>,
23        database: Arc<Database>,
24    ) -> (Self, mpsc::UnboundedReceiver<()>) {
25        let (_tx, rx) = mpsc::unbounded_channel();
26        (
27            SftpManager {
28                config,
29                database,
30            },
31            rx,
32        )
33    }
34
35
36    pub async fn start(self: Arc<Self>) {
37        let mut interval =
38            time::interval(Duration::from_millis(self.config.fs_check_interval));
39
40        info!(
41            "Starting SFTP queue processor with check interval: {}ms",
42            self.config.fs_check_interval
43        );
44
45        loop {
46            interval.tick().await;
47            if let Err(e) = self.process_queue().await {
48                error!("Error processing queue: {e:?}");
49            }
50        }
51    }
52
53
54    async fn process_queue(&self) -> Result<()> {
55        let queue = self.database.get_queue()?;
56        if !queue.is_empty() {
57            // Build clipboard content
58            self.build_clipboard(&queue)?;
59
60            // Process each queue item
61            for item in &queue {
62                if let Err(e) = self.process_element(item).await {
63                    error!("Error processing queue element: {e:?}");
64                }
65            }
66        }
67        Ok(())
68    }
69
70
71    fn build_clipboard(&self, queue: &[QueueItem]) -> Result<()> {
72        let links: Vec<String> = queue
73            .iter()
74            .map(|item| {
75                let config = self
76                    .config
77                    .select_config()
78                    .expect("One of configs should always be selected!");
79                format!(
80                    "{}{}{}",
81                    config.address,
82                    item.uuid,
83                    file_extension(&item.local_file)
84                )
85            })
86            .collect();
87
88        let content = links.join(", ");
89        put_to_clipboard(&content)?;
90
91        if self.config.notifications.clipboard {
92            let _ = notification(
93                "Link copied to clipboard",
94                "clipboard",
95                &self.config.notifications,
96                &self.config.sounds,
97            );
98        }
99
100        Ok(())
101    }
102
103
104    async fn process_element(&self, item: &QueueItem) -> Result<()> {
105        let path = Path::new(&item.local_file);
106
107        // Check if file exists and is regular
108        if !path.exists() || !path.is_file() {
109            debug!(
110                "Local file not found or not a regular file: {}. Skipping.",
111                item.local_file
112            );
113            self.database.remove_from_queue(&item.uuid)?;
114            return Ok(());
115        }
116
117        // Check for temp file pattern
118        if TEMP_PATTERN.is_match(&item.local_file) {
119            debug!("File matches temp pattern, skipping: {}", item.local_file);
120            self.database.remove_from_queue(&item.uuid)?;
121            return Ok(());
122        }
123
124        // Upload file
125        let remote_file = format!("{}{}", item.remote_file, file_extension(&item.local_file));
126        self.send_file(&item.local_file, &remote_file).await?;
127
128        // Add to history
129        self.add_to_history(item)?;
130
131        // Remove from queue
132        self.database.remove_from_queue(&item.uuid)?;
133
134        Ok(())
135    }
136
137
138    async fn send_file(&self, local_file: &str, remote_file: &str) -> Result<()> {
139        let config = &self
140            .config
141            .select_config()
142            .expect("One of configs should always be selected!");
143
144        // Connect via SSH
145        let tcp = TcpStream::connect(format!("{}:{}", config.hostname, config.ssh_port))
146            .context("Failed to connect to SSH server")?;
147        tcp.set_read_timeout(Some(Duration::from_millis(
148            self.config.ssh_connection_timeout,
149        )))?;
150        tcp.set_write_timeout(Some(Duration::from_millis(
151            self.config.ssh_connection_timeout,
152        )))?;
153
154        let mut sess = Session::new()?;
155        sess.set_tcp_stream(tcp);
156        sess.handshake()?;
157
158        // Authenticate
159        let ssh_private_key = if config.ssh_key.is_empty() {
160            ".ssh/id_ed25519"
161        } else {
162            &config.ssh_key
163        };
164
165        sess.userauth_pubkey_file(
166            &config.username,
167            None,
168            Path::new(&home::home_dir().expect("Home dir has to be set!"))
169                .join(ssh_private_key)
170                .as_path(),
171            if config.ssh_key_pass.is_empty() {
172                None
173            } else {
174                Some(&config.ssh_key_pass)
175            },
176        )?;
177
178        if !sess.authenticated() {
179            anyhow::bail!("SSH authentication failed");
180        }
181
182        debug!("SSH connection established");
183
184        // Start SFTP session
185        let sftp = sess.sftp()?;
186        debug!("SFTP session started");
187
188        // Check remote file
189        let local_size = local_file_size(local_file)?;
190        let remote_size = sftp
191            .stat(Path::new(remote_file))
192            .map(|stat| stat.size.unwrap_or(0))
193            .unwrap_or(0);
194
195        debug!(
196            "Local file: {local_file} ({local_size}); Remote file: {remote_file} ({remote_size})"
197        );
198
199        if remote_size > 0 && remote_size == local_size {
200            info!("Found file of same size already uploaded. Skipping");
201            return Ok(());
202        }
203
204        // Upload file
205        let mut local = BufReader::new(File::open(local_file)?);
206        let mut remote = sftp.create(Path::new(remote_file))?;
207
208        stream_file_to_remote(
209            &mut local,
210            &mut remote,
211            self.config.sftp_buffer_size,
212            local_size,
213        )?;
214
215        if self.config.notifications.upload {
216            let _ = notification(
217                "Uploaded successfully.",
218                "upload",
219                &self.config.notifications,
220                &self.config.sounds,
221            );
222        }
223        Ok(())
224    }
225
226
227    fn add_to_history(&self, queue_item: &QueueItem) -> Result<()> {
228        let config = self
229            .config
230            .select_config()
231            .expect("One of configs should always be selected!");
232        let content = format!(
233            "{}{}{}",
234            config.address,
235            queue_item.uuid,
236            file_extension(&queue_item.local_file)
237        );
238
239        // Check if already in history
240        let history = self.database.get_history(None)?;
241        let exists = history.iter().any(|h| h.content.contains(&content));
242
243        if !exists {
244            let history_item = History {
245                content,
246                timestamp: Utc::now().timestamp(),
247                file: queue_item.local_file.clone(),
248                uuid: uuid::Uuid::new_v4().to_string(),
249            };
250            self.database.add_history(&history_item)?;
251        }
252
253        Ok(())
254    }
255}