1#![allow(clippy::mutable_key_type)]
2#![allow(clippy::match_like_matches_macro)]
3
4use backend::Backend;
5use bytes::Bytes;
6use digest::Digest;
7use scutiger_core::errors::{Error, ErrorKind};
8use scutiger_core::pktline;
9use sha2::Sha256;
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt;
13use std::io;
14use std::io::Write;
15use std::iter::FromIterator;
16use std::path::{Path, PathBuf};
17use std::str::FromStr;
18
19pub struct Status {
20 code: u32,
21 args: Option<Vec<Bytes>>,
22 messages: Option<Vec<Bytes>>,
23 reader: Option<Box<dyn io::Read>>,
24}
25
26impl Status {
27 pub fn success() -> Status {
28 Status {
29 code: 200,
30 args: None,
31 messages: None,
32 reader: None,
33 }
34 }
35
36 pub fn new_success(messages: Vec<Bytes>) -> Status {
37 Status {
38 code: 200,
39 args: None,
40 messages: Some(messages),
41 reader: None,
42 }
43 }
44
45 pub fn new_success_with_code(code: u32, args: Vec<Bytes>) -> Status {
46 Status {
47 code,
48 args: Some(args),
49 messages: None,
50 reader: None,
51 }
52 }
53
54 pub fn new_success_with_data(code: u32, args: Vec<Bytes>, messages: Vec<Bytes>) -> Status {
55 Status {
56 code,
57 args: Some(args),
58 messages: Some(messages),
59 reader: None,
60 }
61 }
62
63 pub fn new_reader(args: Vec<Bytes>, reader: Box<dyn io::Read>) -> Status {
64 Status {
65 code: 200,
66 args: Some(args),
67 messages: None,
68 reader: Some(reader),
69 }
70 }
71
72 pub fn new_failure(code: u32, message: &[u8]) -> Status {
73 Status {
74 code,
75 args: None,
76 messages: Some(vec![message.into()]),
77 reader: None,
78 }
79 }
80
81 pub fn new_failure_with_args(code: u32, args: Vec<Bytes>, message: &[u8]) -> Status {
82 Status {
83 code,
84 args: Some(args),
85 messages: Some(vec![message.into()]),
86 reader: None,
87 }
88 }
89}
90
91impl FromIterator<Bytes> for Status {
92 fn from_iter<I: IntoIterator<Item = Bytes>>(iter: I) -> Self {
93 Self::new_success(iter.into_iter().collect())
94 }
95}
96
97pub struct PktLineHandler<R: io::Read, W: io::Write> {
98 pub rdr: pktline::Reader<R>,
99 wrtr: pktline::Writer<W>,
100}
101
102impl<R: io::Read, W: io::Write> PktLineHandler<R, W> {
103 pub fn new(rdr: R, wrtr: W) -> Self {
104 PktLineHandler {
105 rdr: pktline::Reader::new(rdr),
106 wrtr: pktline::Writer::new(wrtr),
107 }
108 }
109
110 pub fn read_to_type(&mut self, typ: pktline::PacketType) -> Result<Vec<Bytes>, Error> {
111 self.rdr
112 .iter()
113 .take_while(|pkt| match pkt {
114 Ok(pkt) if pkt.packet_type() == typ => false,
115 _ => true,
116 })
117 .map(|pkt| pkt.map(|p| p.data().unwrap_or(b"").into()))
118 .collect()
119 }
120
121 pub fn read_to_delim(&mut self) -> Result<Vec<Bytes>, Error> {
122 self.read_to_type(pktline::PacketType::Delim)
123 }
124
125 pub fn read_to_flush(&mut self) -> Result<Vec<Bytes>, Error> {
126 self.read_to_type(pktline::PacketType::Flush)
127 }
128
129 pub fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
130 self.wrtr.write_all(msg)?;
131 Ok(())
132 }
133
134 pub fn flush(&mut self) -> Result<(), Error> {
135 let pkt = pktline::Packet::new(pktline::PacketType::Flush, b"");
136 self.wrtr.write_packet(&pkt)?;
137 Ok(())
138 }
139
140 pub fn delim(&mut self) -> Result<(), Error> {
141 let pkt = pktline::Packet::new(pktline::PacketType::Delim, b"");
142 self.wrtr.write_packet(&pkt)?;
143 Ok(())
144 }
145
146 pub fn send_error(&mut self, status: u32, msg: &str) -> Result<(), Error> {
147 self.send(format!("status {:03}\n", status).as_bytes())?;
148 self.delim()?;
149 self.send(msg.as_bytes())?;
150 self.flush()?;
151 Ok(())
152 }
153
154 pub fn send_status(&mut self, status: Status) -> Result<(), Error> {
155 let mut status = status;
156 self.send(format!("status {:03}\n", status.code).as_bytes())?;
157 if let Some(ref args) = status.args {
158 for arg in args.iter() {
159 self.send(arg)?;
160 }
161 }
162 if let Some(ref messages) = status.messages {
163 self.delim()?;
164 for msg in messages.iter() {
165 self.send(msg)?;
166 }
167 } else if let Some(ref mut reader) = status.reader {
168 self.delim()?;
169 io::copy(reader, &mut self.wrtr)?;
170 }
171 self.flush()?;
172 Ok(())
173 }
174}
175
176#[derive(Clone, Copy)]
177pub enum Mode {
178 Upload,
179 Download,
180}
181
182#[derive(Eq, PartialEq, Ord, PartialOrd, Clone)]
183pub struct Oid {
184 oid: String,
186}
187
188impl Oid {
189 pub fn new(oid: &[u8]) -> Result<Self, Error> {
190 if Self::valid(oid) {
191 Ok(Oid {
194 oid: String::from_utf8_lossy(oid).into(),
195 })
196 } else {
197 Err(Error::new_simple(ErrorKind::InvalidLFSOid))
198 }
199 }
200
201 pub fn as_str(&self) -> &str {
202 &self.oid
203 }
204
205 pub fn value(&self) -> &[u8] {
206 self.oid.as_bytes()
207 }
208
209 pub fn valid(b: &[u8]) -> bool {
210 b.len() == 64
211 && b.iter()
212 .all(|&x| (b'0'..=b'9').contains(&x) || (b'a'..=b'f').contains(&x))
213 }
214
215 pub fn expected_path(&self, path: &Path) -> PathBuf {
218 let mut buf = path.to_path_buf();
219 buf.push("objects");
220 buf.push(&self.oid[0..2]);
221 buf.push(&self.oid[2..4]);
222 buf.push(&self.oid);
223 buf
224 }
225
226 pub fn exists_at_path(&self, path: &Path) -> bool {
229 self.expected_path(path).is_file()
230 }
231
232 pub fn size_at_path(&self, path: &Path) -> Option<u64> {
235 self.expected_path(path).metadata().ok().map(|x| x.len())
236 }
237}
238
239impl fmt::Display for Oid {
240 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
241 write!(f, "{}", self.oid)
242 }
243}
244
245pub struct BatchItem {
246 pub oid: Oid,
247 pub size: u64,
248 pub present: bool,
249}
250
251pub struct ArgumentParser {}
252
253impl ArgumentParser {
254 pub fn parse(args: &[Bytes]) -> Result<BTreeMap<Bytes, Bytes>, Error> {
255 let mut map = BTreeMap::new();
256 for item in args {
257 let equals = match item.iter().position(|&x| x == b'=') {
258 Some(x) => x,
259 None => {
260 return Err(Error::from_message(
261 ErrorKind::ParseError,
262 "unexpected value parsing argument (missing equals)",
263 ));
264 }
265 };
266 if item[item.len() - 1] != b'\n' {
267 return Err(Error::from_message(
268 ErrorKind::ParseError,
269 "unexpected value parsing argument (missing newline)",
270 ));
271 }
272 if map
273 .insert(
274 item[0..equals].into(),
275 item[equals + 1..item.len() - 1].into(),
276 )
277 .is_some()
278 {
279 return Err(Error::from_message(
280 ErrorKind::ExtraData,
281 "unexpected duplicate key",
282 ));
283 };
284 }
285 Ok(map)
286 }
287
288 pub fn parse_value_as_integer<F: FromStr>(
289 args: &BTreeMap<Bytes, Bytes>,
290 key: &[u8],
291 ) -> Result<F, Error> {
292 match args.get(key) {
293 Some(x) => Self::parse_integer(x),
294 None => Err(Error::from_message(
295 ErrorKind::MissingData,
296 "missing required header",
297 )),
298 }
299 }
300
301 pub fn parse_integer<F: FromStr>(item: &Bytes) -> Result<F, Error> {
302 match String::from_utf8_lossy(item).parse() {
305 Ok(x) => Ok(x),
306 Err(_) => Err(Error::from_message(
307 ErrorKind::InvalidInteger,
308 format!("unexpected value parsing integer: {:?}", item),
309 )),
310 }
311 }
312}
313
314pub struct HashingReader<'a, R: io::Read, H: digest::Digest + io::Write> {
315 rdr: &'a mut R,
316 hash: H,
317 size: u64,
318}
319
320impl<'a, R: io::Read, H: digest::Digest + io::Write> HashingReader<'a, R, H> {
321 pub fn new(rdr: &'a mut R, hash: H) -> Self {
322 HashingReader { rdr, hash, size: 0 }
323 }
324
325 pub fn size(&self) -> u64 {
326 self.size
327 }
328
329 pub fn oid(self) -> Result<Oid, Error> {
330 let hex = hex::encode(self.hash.finalize());
331 Oid::new(hex.as_bytes())
332 }
333}
334
335impl<'a, R: io::Read, H: digest::Digest + io::Write> io::Read for HashingReader<'a, R, H> {
336 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
337 let count = self.rdr.read(buf)?;
338 self.hash.write_all(&buf[0..count])?;
339 self.size += count as u64;
340 Ok(count)
341 }
342}
343
344pub struct Processor<'a, R: io::Read, W: io::Write> {
345 handler: PktLineHandler<R, W>,
346 backend: Box<dyn Backend + 'a>,
347}
348
349impl<'a, R: io::Read, W: io::Write> Processor<'a, R, W> {
350 pub fn new(handler: PktLineHandler<R, W>, backend: Box<dyn Backend + 'a>) -> Self {
351 Processor { handler, backend }
352 }
353
354 fn version(&mut self) -> Result<Status, Error> {
355 self.handler.read_to_flush()?;
356 Ok(Status::new_success(vec![]))
357 }
358
359 fn error(&self, code: u32, msg: &str) -> Result<Status, Error> {
360 Ok(Status::new_failure(code, msg.as_bytes()))
361 }
362
363 fn read_batch(&mut self, mode: Mode) -> Result<Vec<BatchItem>, Error> {
364 let args = ArgumentParser::parse(&self.handler.read_to_delim()?)?;
365 let data = match self.handler.read_to_flush() {
366 Ok(v) => v,
367 Err(e) => return Err(Error::new(ErrorKind::ParseError, Some(e))),
368 };
369 let hash_algo = args.get(b"hash-algo" as &[u8]);
370 match hash_algo.map(|x| x as &[u8]) {
371 Some(b"sha256") => (),
372 Some(x) => {
373 return Err(Error::from_message(
374 ErrorKind::NotAllowed,
375 format!(
376 "{} is not a permitted hash algorithm",
377 String::from_utf8_lossy(x)
378 ),
379 ))
380 }
381 None => (),
382 }
383 let oids = data
384 .iter()
385 .map(|line| {
386 if line.is_empty() || line[line.len() - 1] != b'\n' {
387 return Err(Error::new_simple(ErrorKind::InvalidPacket));
388 }
389 let pair: Vec<Bytes> = line[0..line.len()]
390 .split(|&b| b == b' ')
391 .map(|x| x.into())
392 .collect();
393 if pair.len() != 2 || pair[1].is_empty() {
394 return Err(Error::new_simple(ErrorKind::ParseError));
395 }
396 let size = &pair[1];
397 let size = match String::from_utf8_lossy(&size[0..size.len() - 1]).parse() {
398 Ok(x) => x,
399 Err(_) => {
400 return Err(Error::from_message(
401 ErrorKind::InvalidInteger,
402 format!("got {:?}", pair[1]),
403 ))
404 }
405 };
406 Ok((Oid::new(&pair[0])?, size))
407 })
408 .collect::<Result<Vec<_>, Error>>();
409 self.backend.batch(mode, &oids?)
410 }
411
412 fn batch_data(
413 &mut self,
414 mode: Mode,
415 present_action: &str,
416 missing_action: &str,
417 ) -> Result<Status, Error> {
418 let batch = self.read_batch(mode)?;
419 Ok(batch
420 .iter()
421 .map(|item| {
422 let size = format!("{}", item.size);
423 let action = if item.present {
424 present_action
425 } else {
426 missing_action
427 };
428 [
429 item.oid.value(),
430 b" ",
431 size.as_bytes(),
432 b" ",
433 action.as_bytes(),
434 b"\n",
435 ]
436 .join(b"" as &[u8])
437 .into()
438 })
439 .collect())
440 }
441
442 fn upload_batch(&mut self) -> Result<Status, Error> {
443 self.batch_data(Mode::Upload, "noop", "upload")
444 }
445
446 fn download_batch(&mut self) -> Result<Status, Error> {
447 self.batch_data(Mode::Download, "download", "noop")
448 }
449
450 fn size_from_arguments(args: &BTreeMap<Bytes, Bytes>) -> Result<u64, Error> {
451 let size = match args.get(b"size" as &[u8]) {
452 Some(x) => x,
453 None => {
454 return Err(Error::from_message(
455 ErrorKind::MissingData,
456 "missing required size header",
457 ))
458 }
459 };
460 ArgumentParser::parse_integer(size)
461 }
462
463 fn put_object(&mut self, oid: &[u8]) -> Result<Status, Error> {
464 let oid = Oid::new(oid)?;
465 let args = ArgumentParser::parse(&self.handler.read_to_delim()?)?;
466 let expected_size = Self::size_from_arguments(&args)?;
467 let mut rdr = HashingReader::new(&mut self.handler.rdr, Sha256::new());
468 let state = self.backend.start_upload(&oid, &mut rdr, &args)?;
469 let actual_size = rdr.size();
470 match actual_size.cmp(&expected_size) {
471 Ordering::Less => {
472 return Err(Error::from_message(
473 ErrorKind::MissingData,
474 format!("expected {} bytes, got {}", expected_size, actual_size),
475 ))
476 }
477 Ordering::Greater => {
478 return Err(Error::from_message(
479 ErrorKind::ExtraData,
480 format!("expected {} bytes, got {}", expected_size, actual_size),
481 ))
482 }
483 Ordering::Equal => (),
484 }
485
486 let actual_oid = rdr.oid()?;
490 if actual_oid != oid {
491 return Err(Error::from_message(
492 ErrorKind::CorruptData,
493 format!("expected oid {}, got {}", oid, actual_oid),
494 ));
495 }
496 self.backend.finish_upload(state)?;
497 Ok(Status::success())
498 }
499
500 fn verify_object(&mut self, oid: &[u8]) -> Result<Status, Error> {
501 let args = ArgumentParser::parse(&self.handler.read_to_flush()?)?;
502 let oid = Oid::new(oid)?;
503 self.backend.verify(&oid, &args)
504 }
505
506 fn get_object(&mut self, oid: &[u8]) -> Result<Status, Error> {
507 let args = ArgumentParser::parse(&self.handler.read_to_flush()?)?;
508 let oid = Oid::new(oid)?;
509 let (rdr, size) = match self.backend.download(&oid, &args) {
510 Ok(x) => x,
511 Err(e) if e.io_kind() == io::ErrorKind::NotFound => {
512 return Ok(Status::new_failure(404, "not found".as_bytes()))
513 }
514 Err(e) => return Err(e),
515 };
516 let args = match size {
517 Some(size) => vec![format!("size={}\n", size).into()],
518 None => vec![],
519 };
520 Ok(Status::new_reader(args, rdr))
521 }
522
523 fn lock(&mut self) -> Result<Status, Error> {
524 let data = match self.handler.read_to_flush() {
525 Ok(v) => v,
526 Err(e) => return Err(Error::new(ErrorKind::ParseError, Some(e))),
527 };
528 let args = ArgumentParser::parse(&data)?;
529 let path = args.get(b"path" as &[u8]);
530 let refname = args.get(b"refname" as &[u8]);
531 let path = match (path, refname) {
532 (Some(path), Some(_)) => path,
533 (_, _) => {
534 return Err(Error::from_message(
535 ErrorKind::MissingData,
536 "both path and refname required",
537 ))
538 }
539 };
540 let lock_backend = self.backend.lock_backend();
541 let mut retried = false;
542 while !retried {
543 let (ok, lock) = match lock_backend.create(path) {
544 Ok(l) => (true, l),
545 Err(e) if e.kind() == ErrorKind::Conflict => match lock_backend.from_path(path) {
546 Ok(Some(l)) => (false, l),
547 Ok(None) => {
548 retried = true;
549 continue;
550 }
551 Err(e) => return Err(e),
552 },
553 Err(e) => return Err(e),
554 };
555 return if ok {
556 Ok(Status::new_success_with_code(201, lock.as_arguments()))
557 } else {
558 Ok(Status::new_failure_with_args(
559 409,
560 lock.as_arguments(),
561 b"conflict",
562 ))
563 };
564 }
565 unreachable!()
566 }
567
568 fn list_locks_for_path(
569 &mut self,
570 path: &Bytes,
571 cursor: Option<&Bytes>,
572 use_owner_id: bool,
573 ) -> Result<Status, Error> {
574 match (self.backend.lock_backend().from_path(path), cursor) {
575 (Err(e), _) => Err(e),
576 (Ok(None), _) => self.error(404, "not found"),
577 (Ok(Some(l)), Some(id)) if l.id().as_bytes() < id => self.error(404, "not found"),
578 (Ok(Some(l)), _) => l.as_lock_spec(use_owner_id).map(Status::new_success),
579 }
580 }
581
582 fn list_locks(&mut self, use_owner_id: bool) -> Result<Status, Error> {
583 let args = match self.handler.read_to_flush() {
584 Ok(v) => v,
585 Err(e) => return Err(Error::new(ErrorKind::ParseError, Some(e))),
586 };
587 let args = ArgumentParser::parse(&args)?;
588 let mut limit = args
589 .get(b"limit" as &[u8])
590 .map(ArgumentParser::parse_integer)
591 .unwrap_or(Ok(100))?;
592 if limit == 0 {
593 return Err(Error::from_message(
594 ErrorKind::NotAllowed,
595 "bizarre request for no data",
596 ));
597 } else if limit > 100 {
598 limit = 100
600 }
601 let cursor = args.get(b"cursor" as &[u8]);
602 if let Some(path) = args.get(b"path" as &[u8]) {
603 return self.list_locks_for_path(path, cursor, use_owner_id);
604 };
605 let r: Result<Vec<_>, _> = self
606 .backend
607 .lock_backend()
608 .iter()
609 .skip_while(|item| match (item, cursor) {
610 (Err(_), _) => false,
611 (Ok(l), Some(cursor)) => l.id().as_bytes() < cursor,
612 (Ok(_), None) => false,
613 })
614 .take(limit + 1)
615 .collect();
616 let items = r?;
617 let lock_specs: Result<Vec<_>, _> =
618 items.iter().map(|l| l.as_lock_spec(use_owner_id)).collect();
619 let lock_specs = lock_specs?.iter().flatten().cloned().collect();
620 let next_cursor: Vec<Bytes> = if items.len() == limit + 1 {
621 vec![format!("next-cursor={}\n", items[limit].id()).into()]
622 } else {
623 vec![]
624 };
625 Ok(Status::new_success_with_data(200, next_cursor, lock_specs))
626 }
627
628 fn unlock(&mut self, id: &[u8]) -> Result<Status, Error> {
629 self.handler.read_to_flush()?;
630 let s = match std::str::from_utf8(id) {
631 Ok(s) => s,
632 Err(_) => {
633 return Err(Error::from_message(
634 ErrorKind::CorruptData,
635 "invalid or corrupt ID",
636 ))
637 }
638 };
639 let lock_backend = self.backend.lock_backend();
640 match lock_backend.from_id(s) {
641 Ok(Some(l)) => {
642 let args = l.as_arguments();
643 match lock_backend.unlock(l) {
644 Ok(()) => Ok(Status::new_success_with_code(200, args)),
645 Err(e) if e.io_kind() == io::ErrorKind::NotFound => {
646 self.error(404, "not found")
647 }
648 Err(e) if e.io_kind() == io::ErrorKind::PermissionDenied => {
649 self.error(403, "forbidden")
650 }
651 Err(e) => Err(e),
652 }
653 }
654 Ok(None) => self.error(404, "not found"),
655 Err(e) => Err(e),
656 }
657 }
658
659 pub fn process_commands(&mut self, mode: Mode) -> Result<(), Error> {
660 loop {
661 let pkt = match self.handler.rdr.read_packet() {
662 Ok(p) => p,
663 Err(e) if e.io_kind() == io::ErrorKind::UnexpectedEof => return Ok(()),
664 Err(e) => return Err(e),
665 };
666 let msgs: Vec<_> = match pkt.data() {
667 Some(b"") => {
668 self.handler.send_error(400, "no command provided")?;
669 continue;
670 }
671 Some(bs) => {
672 let bs = if bs[bs.len() - 1] == b'\n' {
673 &bs[0..bs.len() - 1]
674 } else {
675 bs
676 };
677 bs.split(|&b| b == b' ').collect()
678 }
679 None => {
680 self.handler.send_error(400, "unknown command")?;
681 continue;
682 }
683 };
684 let resp = match (msgs[0], msgs.get(1), mode) {
685 (b"version", Some(&b"1"), _) => self.version(),
686 (b"version", _, _) => self.error(400, "unknown version"),
687 (b"batch", None, Mode::Upload) => self.upload_batch(),
688 (b"batch", None, Mode::Download) => self.download_batch(),
689 (b"put-object", Some(oid), Mode::Upload) => self.put_object(oid),
690 (b"put-object", Some(_), _) => self.error(403, "not allowed"),
691 (b"verify-object", Some(oid), Mode::Upload) => self.verify_object(oid),
692 (b"verify-object", Some(_), _) => self.error(403, "not allowed"),
693 (b"get-object", Some(oid), Mode::Download) => self.get_object(oid),
694 (b"get-object", Some(_), _) => self.error(403, "not allowed"),
695 (b"lock", None, Mode::Upload) => self.lock(),
696 (b"list-lock", None, Mode::Download) => self.list_locks(false),
697 (b"list-lock", None, Mode::Upload) => self.list_locks(true),
698 (b"unlock", Some(id), Mode::Upload) => self.unlock(id),
699 (b"quit", None, _) => {
700 self.handler.send_status(Status::success())?;
701 return Ok(());
702 }
703 (_, _, _) => self.error(400, "unknown command"),
704 };
705 match resp {
706 Ok(st) => self.handler.send_status(st),
707 Err(e) => match e.kind() {
708 ErrorKind::BadPktlineHeader
709 | ErrorKind::InvalidPacket
710 | ErrorKind::UnexpectedPacket
711 | ErrorKind::InvalidLFSOid
712 | ErrorKind::InvalidInteger
713 | ErrorKind::MissingData
714 | ErrorKind::ExtraData
715 | ErrorKind::CorruptData
716 | ErrorKind::NotAllowed
717 | ErrorKind::UnknownCommand => self
718 .handler
719 .send_status(Status::new_failure(400, format!("error: {}", e).as_bytes())),
720 _ => self.handler.send_status(Status::new_failure(
721 500,
722 format!("internal error: {}", e).as_bytes(),
723 )),
724 },
725 }?;
726 }
727 }
728}