sftp_server/
lib.rs

1#![allow(unused_parens)]
2#![allow(non_upper_case_globals)]
3#[macro_use] extern crate async_trait;
4
5use std::collections::HashMap;
6use std::collections::VecDeque;
7use std::sync::Arc;
8
9use futures::executor::block_on;
10#[cfg(feature = "standalone")]
11use futures::future::{ready, Ready};
12
13use tokio::io::AsyncReadExt;
14use tokio::io::AsyncSeekExt;
15use tokio::io::AsyncWriteExt;
16use tokio::io::SeekFrom;
17use tokio::sync::Mutex;
18
19use bincode::Options;
20
21#[cfg(feature = "standalone")]
22use thrussh::{
23	ChannelId,
24	server::{
25		Auth,
26		Handle,
27		Handler,
28		Response,
29		Session
30	}
31};
32
33use uuid::Uuid;
34
35use sftp_protocol::packet;
36use sftp_protocol::packet::name::File;
37use sftp_protocol::packet::open::OpenFlags;
38use sftp_protocol::packet::PayloadTrait;
39use sftp_protocol::packet::status::StatusType;
40use sftp_protocol::Packet;
41use sftp_protocol::Payload;
42use sftp_protocol::parser::Parser;
43
44pub use sftp_protocol::common;
45
46pub mod backend;
47use backend::Backend;
48mod error;
49pub use error::Error;
50pub mod file;
51use file::OpenFile;
52
53#[derive(Clone)]
54pub struct Server<B: Backend> {
55	backend: B,
56
57	#[cfg(feature = "standalone")]
58	pub clients: Arc<std::sync::Mutex<HashMap<(usize, ChannelId), Handle>>>,
59	pub id: usize,
60
61	#[allow(clippy::type_complexity)]
62	// In order to support large directories without blowing up, this may end up needing to hold a Stream<Item=File> instead of VecDeque<File>; for now this is fine.
63	open_dirs: Arc<Mutex<HashMap<Uuid, (VecDeque<File>, usize)>>>,
64	open_files: Arc<Mutex<HashMap<Uuid, OpenFile>>>,
65	#[cfg(feature = "standalone")]
66	parsers: HashMap<ChannelId, Parser>,
67}
68
69impl<B: Backend> Server<B> {
70	pub fn new(backend: B, id: usize) -> Self /* {{{ */ {
71		Self{
72			backend,
73			#[cfg(feature = "standalone")]
74			clients: Arc::new(std::sync::Mutex::new(HashMap::new())),
75			id,
76			open_dirs: Arc::new(Mutex::new(HashMap::new())),
77			open_files: Arc::new(Mutex::new(HashMap::new())),
78			#[cfg(feature = "standalone")]
79			parsers: HashMap::new(),
80		}
81	} // }}}
82
83	async fn process_request(&self, input: Packet) -> Result<Packet, Error> /* {{{ */ {
84		let output = match input.payload {
85			Payload::Init(_) => Payload::version(3, vec![]).into_packet(),
86			Payload::Version(_) => unreachable!(),
87			Payload::Open(r) => /* {{{ */ {
88				let path = r.path;
89				let result = self.backend.open(
90					&path,
91					r.pflags.contains(OpenFlags::Read),
92					r.pflags.contains(OpenFlags::Write),
93					r.pflags.contains(OpenFlags::Append),
94					r.pflags.contains(OpenFlags::Create),
95					r.pflags.contains(OpenFlags::Truncate),
96					r.pflags.contains(OpenFlags::Exclude)
97				).await;
98				let response = match result {
99					Ok(v) => {
100						let response = Payload::handle(r.id);
101						let mut state = self.open_files.lock().await;
102						state.insert(response.handle, v);
103						Payload::Handle(response)
104					},
105					Err(e) => {
106						eprintln!("!!! Failed to open file: {:?}", e);
107						Payload::status(r.id, StatusType::Failure, format!("Failed to open file: {}", e))
108					}
109				};
110				response.into_packet()
111			}, // }}}
112			Payload::Close(r) => /* {{{ */ {
113				let mut files = self.open_files.lock().await;
114				let response = match files.remove(&r.handle) {
115					Some(_) => Payload::status(r.id, StatusType::OK, "OK"),
116					None => match self.open_dirs.lock().await.remove(&r.handle) {
117						Some(_) => Payload::status(r.id, StatusType::OK, "OK"),
118						None => Payload::status(r.id, StatusType::NoSuchFile, format!("Handle {} does not exist", &r.handle))
119					}
120				};
121				response.into_packet()
122			}, // }}}
123			Payload::Read(r) => /* {{{ */ {
124				let mut state = self.open_files.lock().await;
125				let response = match state.get_mut(&r.handle) {
126					Some(ref mut file) => {
127						let mut packet = Payload::data_with_size(r.id, r.len);
128						file.seek(SeekFrom::Start(r.offset)).await?;
129						let count = file.read(&mut packet.data).await?;
130						if(count == 0) {
131							Payload::status(r.id, StatusType::EOF, "EOF")
132						} else {
133							packet.data.truncate(count);
134							Payload::Data(packet)
135						}
136					},
137					None => Payload::status(r.id, StatusType::NoSuchFile, "No such file")
138				};
139				response.into_packet()
140			}, // }}}
141			Payload::Write(r) => /* {{{ */ {
142				let mut state = self.open_files.lock().await;
143				let response = match state.get_mut(&r.handle) {
144					Some(ref mut file) => {
145						// TODO:  When attempting to seek past the end of the (existing), zerofill the gap
146						file.seek(SeekFrom::Start(r.offset)).await?;
147						let mut written = 0;
148						while(written < r.data.len()) {
149							let count = file.write(&r.data[written..]).await?;
150							written += count;
151						}
152						if(written < r.data.len()) {
153							Payload::status(r.id, StatusType::EOF, "EOF")
154						} else {
155							Payload::status(r.id, StatusType::OK, "OK")
156						}
157					},
158					None => Payload::status(r.id, StatusType::NoSuchFile, "No such file")
159				};
160				response.into_packet()
161			}, // }}}
162			Payload::Lstat(r) => /* {{{ */ {
163				// TODO:  Don't follow symlinks
164				let mut attrs = Payload::attrs(r.id);
165				attrs.attrs = self.backend.metadata(&r.path).await?.into();
166				attrs.into_packet()
167			}, // }}}
168			Payload::Fstat(r) => /* {{{ */ {
169				let open_files = self.open_files.lock().await;
170				let response = match open_files.get(&r.handle) {
171					Some(v) => {
172						let mut attrs = Payload::attrs(r.id);
173						attrs.attrs = v.metadata.clone().into();
174						Payload::Attrs(attrs)
175					},
176					None => Payload::status(r.id, StatusType::NoSuchFile, "Handle not found")
177				};
178				response.into_packet()
179			}, // }}}
180			Payload::SetStat(r) => /* {{{ */ {
181				let response = match self.backend.set_metadata(&r.path, r.attrs.get_uid_gid(), r.attrs.get_permissions(), r.attrs.get_atime_mtime()).await {
182					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
183					Err(e) => {
184						eprintln!("!!! Failed to set metadata on {}:  {:?}", &r.path, e);
185						Payload::status(r.id, StatusType::Failure, format!("Failed to set metadata: {}", e))
186					}
187				};
188				response.into_packet()
189			}, // }}}
190			Payload::FSetStat(r) => /* {{{ */ {
191				let open_files = self.open_files.lock().await;
192				let response = match open_files.get(&r.handle) {
193					Some(v) => match self.backend.set_metadata(&v.metadata.path, r.attrs.get_uid_gid(), r.attrs.get_permissions(), r.attrs.get_atime_mtime()).await {
194						Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
195						Err(e) => {
196							eprintln!("!!! Failed to set metadata on {}:  {}", &v.metadata.path, e);
197							Payload::status(r.id, StatusType::Failure, format!("Failed to set metadata: {}", e))
198						}
199					},
200					None => Payload::status(r.id, StatusType::NoSuchFile, "Handle not found")
201				};
202				response.into_packet()
203			}, // }}}
204			Payload::OpenDir(r) => /* {{{ */ {
205				let response = Payload::handle(r.id);
206				let contents = self.backend.list(&r.path).await?;
207				self.open_dirs.lock().await.insert(
208					response.handle, (
209						contents.into_iter().map(|f| File::new((&f).into(), f.path)).collect(),
210						0
211					)
212				);
213				response.into_packet()
214			}, // }}}
215			Payload::ReadDir(r) => /* {{{ */ {
216				let mut state = self.open_dirs.lock().await;
217				match state.get_mut(&r.handle) {
218					Some((ref mut files, ref mut index)) => {
219						// TODO:  I would imagine that there's a limit on the response size.  We'll end up needing to chunk.
220						if(*index >= files.len()) {
221							Payload::status(r.id, StatusType::EOF, "EOF").into_packet()
222						} else {
223							let mut payload = Payload::name(r.id);
224							if(!files.is_empty()) {
225								payload.files = files.clone().into();
226							}
227							*index += payload.files.len();
228							payload.into_packet()
229						}
230					},
231					None => Payload::status(r.id, StatusType::EOF, "EOF").into_packet()
232				}
233			}, // }}}
234			Payload::Remove(r) => /* {{{ */ {
235				let response = match self.backend.delete_file(&r.path).await {
236					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
237					Err(e) => Payload::status(r.id, StatusType::Failure, format!("Failed to delete file: {}", e))
238				};
239				response.into_packet()
240			}, // }}}
241			Payload::MkDir(r) => /* {{{ */ {
242				let response = match self.backend.mkdir(&r.path).await {
243					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
244					Err(e) => Payload::status(r.id, StatusType::Failure, format!("Failed to create directory: {}", e))
245				};
246				response.into_packet()
247			}, // }}}
248			Payload::RmDir(r) => /* {{{ */ {
249				let response = match self.backend.rmdir(&r.path).await {
250					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
251					Err(e) => Payload::status(r.id, StatusType::Failure, format!("Failed to delete directory: {}", e))
252				};
253				response.into_packet()
254			}, // }}}
255			Payload::RealPath(r) => /* {{{ */ {
256				let mut name = packet::name::Name::new(r.id);
257				let normalized = self.backend.normalize_path(&r.path)?;
258				name.append_file(&normalized, self.backend.metadata(&r.path).await?.into());
259				name.into_packet()
260			}, // }}}
261			Payload::Stat(r) => /* {{{ */ {
262				// TODO:  Follow symlinks
263				match self.backend.metadata(&r.path).await {
264					Ok(v) => {
265						let mut attrs = Payload::attrs(r.id);
266						attrs.attrs = v.into();
267						attrs.into_packet()
268					},
269					Err(Error::IO(e)) if e.kind() == std::io::ErrorKind::InvalidInput => Payload::status(r.id, StatusType::NoSuchFile, "File not found").into_packet(),
270					Err(e) => Payload::status(r.id, StatusType::Failure, e.to_string()).into_packet()
271				}
272			}, // }}}
273			Payload::Rename(r) => /* {{{ */ {
274				let response = match self.backend.rename(&r.oldpath, &r.newpath).await {
275					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
276					Err(e) => Payload::status(r.id, StatusType::Failure, format!("Failed to rename: {}", e))
277				};
278				response.into_packet()
279			}, // }}}
280			Payload::ReadLink(r) => /* {{{ */ {
281				match self.backend.readlink(&r.path).await {
282					Ok(Some(path)) => {
283						let mut response = Payload::name(r.id);
284						response.append_file(&path, self.backend.metadata(&path).await?.into());
285						response.into_packet()
286					},
287					Ok(None) => Payload::status(r.id, StatusType::NoSuchFile, "File not found").into_packet(),
288					Err(e) => Payload::status(r.id, StatusType::Failure, e.to_string()).into_packet()
289				}
290			}, // }}}
291			Payload::Symlink(r) => /* {{{ */ {
292				let response = match self.backend.mklink(&r.targetpath, &r.linkpath, LinkType::Symbolic).await {
293					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
294					Err(Error::IO(e)) if e.kind() == std::io::ErrorKind::InvalidInput => Payload::status(r.id, StatusType::NoSuchFile, "File not found"),
295					Err(e) => Payload::status(r.id, StatusType::Failure, e.to_string())
296				};
297				response.into_packet()
298			}, // }}}
299			Payload::Status(_) => unreachable!(),
300			Payload::Handle(_) => unreachable!(),
301			Payload::Data(_) => unreachable!(),
302			Payload::Name(_) => unreachable!(),
303			Payload::Attrs(_) => unreachable!(),
304			Payload::Extended(_) => unimplemented!(),
305			Payload::ExtendedReply(_) => unreachable!()
306		};
307		Ok(output)
308	} // }}}
309
310	async fn process_packet(&mut self, packet: Packet) -> Result<Option<Vec<u8>>, Error> /* {{{ */ {
311		let se = bincode::DefaultOptions::new().with_big_endian().with_fixint_encoding();
312		let response = self.process_request(packet).await?;
313		let response_bytes = se.serialize(&response)?;
314		Ok(Some(response_bytes))
315	} // }}}
316
317	#[cfg(not(feature = "standalone"))]
318	pub async fn run(&mut self) -> Result<(), Error> /* {{{ */ {
319		let mut buf = [0u8; 8192];
320		let mut packet_stream = Parser::default();
321		let mut stdin = tokio::io::stdin();
322		let mut stdout = tokio::io::stdout();
323		// TODO:  This needs a way to be terminated
324		loop {
325			let count = stdin.read(&mut buf).await?;
326			if(count == 0) {
327				tokio::time::sleep(std::time::Duration::from_millis(10)).await;
328				continue;
329			}
330			if let Err(e) = packet_stream.write(&buf[0..count]) {
331				eprintln!("!!! run():  Failed to write packet to buffer:  {:?}", e);
332				continue;
333			}
334			let packet = match packet_stream.get_packet() {
335				Ok(None) => continue,
336				Ok(Some(v)) => v,
337				Err(e) => {
338					eprintln!("!!! run():  Failed to parse packet:  {:?}", e);
339					continue;
340				}
341			};
342			let response = match block_on(self.process_packet(packet)) {
343				Ok(Some(v)) => v,
344				Ok(None) => continue,
345				Err(e) => {
346					eprintln!("!!! run():  Failed to process packet:  {:?}", e);
347					continue;
348				}
349			};
350			stdout.write_all(&response).await?;
351			stdout.flush().await?;
352		}
353	} // }}}
354}
355
356#[cfg(feature = "standalone")]
357impl<B: Backend> thrussh::server::Server for Server<B> {
358	type Handler = Self;
359	fn new(&mut self, _: Option<std::net::SocketAddr>) -> Self /* {{{ */ {
360		let s = self.clone();
361		self.id += 1;
362		s
363	} // }}}
364}
365
366#[cfg(feature = "standalone")]
367impl<B: Backend> Handler for Server<B> {
368	type Error = Error;
369	type FutureAuth = Ready<Result<(Self, Auth), Error>>;
370	type FutureUnit = Ready<Result<(Self, Session), Error>>;
371	type FutureBool = Ready<Result<(Self, Session, bool), Error>>;
372
373	fn finished_auth(self, auth: Auth) -> Self::FutureAuth /* {{{ */ {
374		ready(Ok((self, auth)))
375	} // }}}
376
377	fn finished_bool(self, result: bool, session: Session) -> Self::FutureBool /* {{{ */ {
378		ready(Ok((self, session, result)))
379	} // }}}
380
381	fn finished(self, session: Session) -> Self::FutureUnit /* {{{ */ {
382		ready(Ok((self, session)))
383	} // }}}
384
385	fn channel_open_session(self, channel: ChannelId, session: Session) -> Self::FutureUnit /* {{{ */ {
386		{
387			let mut clients = self.clients.lock().unwrap();
388			clients.insert((self.id, channel), session.handle());
389		}
390		self.finished(session)
391	} // }}}
392
393	fn auth_publickey(self, _: &str, _: &thrussh_keys::key::PublicKey) -> Self::FutureAuth /* {{{ */ {
394		// TODO:  Actually validate authenticaiton.
395		eprintln!("auth key success");
396		self.finished_auth(Auth::Accept)
397	} // }}}
398
399	fn auth_keyboard_interactive(self, user: &str, submethods: &str, response: Option<Response>) -> Self::FutureAuth /* {{{ */ {
400		// TODO:  Actually validate authentication.
401		eprintln!("auth_keyboard_interactive('{}', '{}', {:?})", user, submethods, response);
402		eprintln!("auth int success");
403		self.finished_auth(Auth::Accept)
404	} // }}}
405
406	fn data(mut self, channel: ChannelId, data: &[u8], mut session: Session) -> Self::FutureUnit /* {{{ */ {
407		let stream = self.parsers.entry(channel).or_default();
408		if let Err(e) = stream.write(data) {
409			return ready(Err(e.into()));
410		}
411		let packet = match stream.get_packet() {
412			Ok(None) => return self.finished(session),
413			Ok(Some(v)) => v,
414			Err(e) => {
415				eprintln!("!!! data():  Failed to parse packet in channel {:?}:  {:?}", channel, e);
416				return ready(Err(e.into()));
417			}
418		};
419		let response = match block_on(self.process_packet(packet)) {
420			Ok(Some(v)) => v,
421			Ok(None) => return self.finished(session),
422			Err(e) => {
423				eprintln!("!!! data():  Failed to process packet in channel {:?}:  {:?}", channel, e);
424				return ready(Err(e));
425			}
426		};
427		session.data(channel, response.into());
428		self.finished(session)
429	} // }}}
430
431	fn subsystem_request(self, _channel: ChannelId, _name: &str, session: Session) -> Self::FutureUnit /* {{{ */ {
432		// TODO:  We should keep state at the Server level for whether or not
433		//    the SFTP subsystem has indeed been requested for a given channel
434		//    ID.
435		self.finished(session)
436	} // }}}
437}
438
439#[derive(Debug)]
440pub enum LinkType {
441	Symbolic,
442	Hard
443}
444