mwc_p2p/
msg.rs

1// Copyright 2019 The Grin Developers
2// Copyright 2024 The MWC Developers
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Message types that transit over the network and related serialization code.
17
18use crate::chain::txhashset::BitmapSegment;
19use crate::conn::Tracker;
20use crate::mwc_core::core::hash::Hash;
21use crate::mwc_core::core::transaction::{OutputIdentifier, TxKernel};
22use crate::mwc_core::core::{
23	BlockHeader, Segment, SegmentIdentifier, Transaction, UntrustedBlock, UntrustedBlockHeader,
24	UntrustedCompactBlock,
25};
26use crate::mwc_core::pow::Difficulty;
27use crate::mwc_core::ser::{
28	self, DeserializationMode, ProtocolVersion, Readable, Reader, StreamingReader, Writeable,
29	Writer,
30};
31use crate::mwc_core::{consensus, global};
32use crate::types::{
33	AttachmentMeta, AttachmentUpdate, Capabilities, Error, PeerAddr, ReasonForBan,
34	MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
35};
36use crate::util::secp::pedersen::RangeProof;
37use bytes::Bytes;
38use num::FromPrimitive;
39use std::fs::File;
40use std::io::{Read, Write};
41use std::sync::Arc;
42use std::{fmt, thread, time::Duration};
43
44/// Mwc's user agent with current version
45pub const USER_AGENT: &str = concat!("MW/MWC ", env!("CARGO_PKG_VERSION"));
46
47// MWC - Magic number are updated to be different from mwc.
48/// Magic numbers expected in the header of every message
49const OTHER_MAGIC: [u8; 2] = [21, 19];
50const FLOONET_MAGIC: [u8; 2] = [17, 36];
51const MAINNET_MAGIC: [u8; 2] = [13, 77];
52
53// Types of messages.
54// Note: Values here are *important* so we should only add new values at the
55// end.
56enum_from_primitive! {
57	#[derive(Debug, Clone, Copy, PartialEq)]
58	pub enum Type {
59		Error = 0,
60		Hand = 1,
61		Shake = 2,
62		Ping = 3,
63		Pong = 4,
64		GetPeerAddrs = 5,
65		PeerAddrs = 6,
66		GetHeaders = 7,
67		Header = 8,
68		Headers = 9,
69		GetBlock = 10,
70		Block = 11,
71		GetCompactBlock = 12,
72		CompactBlock = 13,
73		StemTransaction = 14,
74		Transaction = 15,
75		TxHashSetRequest = 16,
76		TxHashSetArchive = 17,
77		BanReason = 18,
78		GetTransaction = 19,
79		TransactionKernel = 20,
80		TorAddress = 23,
81		StartPibdSyncRequest = 24,
82		GetOutputBitmapSegment = 25,
83		OutputBitmapSegment = 26,
84		GetOutputSegment = 27,
85		OutputSegment = 28,
86		GetRangeProofSegment = 29,
87		RangeProofSegment = 30,
88		GetKernelSegment = 31,
89		KernelSegment = 32,
90		HasAnotherArchiveHeader = 33,
91		PibdSyncState = 34,
92		StartHeadersHashRequest = 35,
93		StartHeadersHashResponse = 36,
94		GetHeadersHashesSegment = 37,
95		OutputHeadersHashesSegment = 38,
96	}
97}
98
99/// Max theoretical size of a block filled with outputs.
100fn max_block_size() -> u64 {
101	(global::max_block_weight() / consensus::BLOCK_OUTPUT_WEIGHT * 708) as u64
102}
103
104// Max msg size when msg type is unknown.
105fn default_max_msg_size() -> u64 {
106	max_block_size()
107}
108
109// Max msg size for each msg type.
110fn max_msg_size(msg_type: Type) -> u64 {
111	match msg_type {
112		Type::Error => 0,
113		Type::Hand => 128 + 8,
114		Type::Shake => 88 + 8,
115		Type::Ping => 16,
116		Type::Pong => 16,
117		Type::GetPeerAddrs => 4,
118		Type::PeerAddrs => 4 + (1 + 16 + 2) * MAX_PEER_ADDRS as u64,
119		Type::GetHeaders => 1 + 32 * MAX_LOCATORS as u64,
120		Type::Header => 365,
121		Type::Headers => 2 + 365 * MAX_BLOCK_HEADERS as u64,
122		Type::GetBlock => 32,
123		Type::Block => max_block_size(),
124		Type::GetCompactBlock => 32,
125		Type::CompactBlock => max_block_size() / 10,
126		Type::StemTransaction => max_block_size(),
127		Type::Transaction => max_block_size(),
128		Type::TxHashSetRequest => 40, // 32+8=40
129		Type::TxHashSetArchive => 64,
130		Type::BanReason => 64,
131		Type::GetTransaction => 32,
132		Type::TransactionKernel => 32,
133		Type::TorAddress => 128,
134		Type::StartHeadersHashRequest => 8,
135		Type::StartHeadersHashResponse => 40, // 8+32=40
136		Type::GetHeadersHashesSegment => 41,
137		Type::OutputHeadersHashesSegment => 2 * max_block_size(),
138		Type::GetOutputBitmapSegment => 41,
139		Type::OutputBitmapSegment => 2 * max_block_size(),
140		Type::GetOutputSegment => 41,
141		Type::OutputSegment => 2 * max_block_size(),
142		Type::GetRangeProofSegment => 41,
143		Type::RangeProofSegment => 2 * max_block_size(),
144		Type::GetKernelSegment => 41,
145		Type::KernelSegment => 2 * max_block_size(),
146		Type::StartPibdSyncRequest => 40, // 32+8=40
147		Type::HasAnotherArchiveHeader => 40,
148		Type::PibdSyncState => 72, // 32 + 8 + 32 = 72
149	}
150}
151
152fn magic() -> [u8; 2] {
153	match global::get_chain_type() {
154		global::ChainTypes::Floonet => FLOONET_MAGIC,
155		global::ChainTypes::Mainnet => MAINNET_MAGIC,
156		_ => OTHER_MAGIC,
157	}
158}
159
160pub struct Msg {
161	header: MsgHeader,
162	body: Vec<u8>,
163	attachment: Option<File>,
164	version: ProtocolVersion,
165}
166
167impl Msg {
168	pub fn new<T: Writeable>(
169		msg_type: Type,
170		msg: T,
171		version: ProtocolVersion,
172	) -> Result<Msg, Error> {
173		let body = ser::ser_vec(&msg, version)?;
174		Ok(Msg {
175			header: MsgHeader::new(msg_type, body.len() as u64),
176			body,
177			attachment: None,
178			version,
179		})
180	}
181
182	pub fn add_attachment(&mut self, attachment: File) {
183		self.attachment = Some(attachment)
184	}
185}
186
187/// Read a header from the provided stream without blocking if the
188/// underlying stream is async. Typically headers will be polled for, so
189/// we do not want to block.
190///
191/// Note: We return a MsgHeaderWrapper here as we may encounter an unknown msg type.
192///
193pub fn read_header<R: Read>(
194	stream: &mut R,
195	version: ProtocolVersion,
196) -> Result<MsgHeaderWrapper, Error> {
197	let mut head = vec![0u8; MsgHeader::LEN];
198	stream.read_exact(&mut head)?;
199	let header: MsgHeaderWrapper =
200		ser::deserialize(&mut &head[..], version, DeserializationMode::default())?;
201	Ok(header)
202}
203
204/// Read a single item from the provided stream, always blocking until we
205/// have a result (or timeout).
206/// Returns the item and the total bytes read.
207pub fn read_item<T: Readable, R: Read>(
208	stream: &mut R,
209	version: ProtocolVersion,
210) -> Result<(T, u64), Error> {
211	let mut reader = StreamingReader::new(stream, version);
212	let res = T::read(&mut reader)?;
213	Ok((res, reader.total_bytes_read()))
214}
215
216/// Read a message body from the provided stream, always blocking
217/// until we have a result (or timeout).
218pub fn read_body<T: Readable, R: Read>(
219	h: &MsgHeader,
220	stream: &mut R,
221	version: ProtocolVersion,
222) -> Result<T, Error> {
223	let mut body = vec![0u8; h.msg_len as usize];
224	stream.read_exact(&mut body)?;
225	ser::deserialize(&mut &body[..], version, DeserializationMode::default()).map_err(From::from)
226}
227
228/// Read (an unknown) message from the provided stream and discard it.
229pub fn read_discard<R: Read>(msg_len: u64, stream: &mut R) -> Result<(), Error> {
230	let mut buffer = vec![0u8; msg_len as usize];
231	stream.read_exact(&mut buffer)?;
232	Ok(())
233}
234
235/// Reads a full message from the underlying stream.
236pub fn read_message<T: Readable, R: Read>(
237	stream: &mut R,
238	version: ProtocolVersion,
239	msg_type: Type,
240) -> Result<T, Error> {
241	match read_header(stream, version)? {
242		MsgHeaderWrapper::Known(header) => {
243			if header.msg_type == msg_type {
244				read_body(&header, stream, version)
245			} else {
246				Err(Error::BadMessage)
247			}
248		}
249		MsgHeaderWrapper::Unknown(msg_len, _) => {
250			read_discard(msg_len, stream)?;
251			Err(Error::BadMessage)
252		}
253	}
254}
255
256pub fn write_message<W: Write>(
257	stream: &mut W,
258	msgs: &Vec<Msg>,
259	tracker: Arc<Tracker>,
260) -> Result<(), Error> {
261	// Introduce a delay so messages are spaced at least 150ms apart.
262	// This gives a max msg rate of 60000/150 = 400 messages per minute.
263	// Exceeding 500 messages per minute will result in being banned as abusive.
264	if let Some(elapsed) = tracker.sent_bytes.read().elapsed_since_last_msg() {
265		let min_interval: u64 = 150;
266		let sleep_ms = min_interval.saturating_sub(elapsed);
267		if sleep_ms > 0 {
268			thread::sleep(Duration::from_millis(sleep_ms))
269		}
270	}
271
272	// sending tmp buffer.
273	let mut tmp_buf: Vec<u8> = vec![];
274
275	for msg in msgs {
276		tmp_buf.extend(ser::ser_vec(&msg.header, msg.version)?);
277		tmp_buf.extend(&msg.body[..]);
278		if let Some(file) = &msg.attachment {
279			// finalize what we have before attachments...
280			if !tmp_buf.is_empty() {
281				stream.write_all(&tmp_buf[..])?;
282				tracker.inc_sent(tmp_buf.len() as u64);
283				tmp_buf.clear();
284			}
285			let mut file = file.try_clone()?;
286			let mut buf = [0u8; 8000];
287			loop {
288				match file.read(&mut buf[..]) {
289					Ok(0) => break,
290					Ok(n) => {
291						stream.write_all(&buf[..n])?;
292						// Increase sent bytes "quietly" without incrementing the counter.
293						// (In a loop here for the single attachment).
294						tracker.inc_quiet_sent(n as u64);
295					}
296					Err(e) => return Err(From::from(e)),
297				}
298			}
299		}
300	}
301
302	if !tmp_buf.is_empty() {
303		stream.write_all(&tmp_buf[..])?;
304		tracker.inc_sent(tmp_buf.len() as u64);
305		tmp_buf.clear();
306	}
307
308	Ok(())
309}
310
311/// A wrapper around a message header. If the header is for an unknown msg type
312/// then we will be unable to parse the msg itself (just a bunch of random bytes).
313/// But we need to know how many bytes to discard to discard the full message.
314#[derive(Clone)]
315pub enum MsgHeaderWrapper {
316	/// A "known" msg type with deserialized msg header.
317	Known(MsgHeader),
318	/// An unknown msg type with corresponding msg size in bytes.
319	Unknown(u64, u8),
320}
321
322/// Header of any protocol message, used to identify incoming messages.
323#[derive(Clone)]
324pub struct MsgHeader {
325	magic: [u8; 2],
326	/// Type of the message.
327	pub msg_type: Type,
328	/// Total length of the message in bytes.
329	pub msg_len: u64,
330}
331
332impl MsgHeader {
333	// 2 magic bytes + 1 type byte + 8 bytes (msg_len)
334	pub const LEN: usize = 2 + 1 + 8;
335
336	/// Creates a new message header.
337	pub fn new(msg_type: Type, len: u64) -> MsgHeader {
338		MsgHeader {
339			magic: magic(),
340			msg_type: msg_type,
341			msg_len: len,
342		}
343	}
344}
345
346impl Writeable for MsgHeader {
347	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
348		ser_multiwrite!(
349			writer,
350			[write_u8, self.magic[0]],
351			[write_u8, self.magic[1]],
352			[write_u8, self.msg_type as u8],
353			[write_u64, self.msg_len]
354		);
355		Ok(())
356	}
357}
358
359impl Readable for MsgHeaderWrapper {
360	fn read<R: Reader>(reader: &mut R) -> Result<MsgHeaderWrapper, ser::Error> {
361		let m = magic();
362		reader.expect_u8(m[0])?;
363		reader.expect_u8(m[1])?;
364
365		// Read the msg header.
366		// We do not yet know if the msg type is one we support locally.
367		let (t, msg_len) = ser_multiread!(reader, read_u8, read_u64);
368
369		// Attempt to convert the msg type byte into one of our known msg type enum variants.
370		// Check the msg_len while we are at it.
371		match Type::from_u8(t) {
372			Some(msg_type) => {
373				// TODO 4x the limits for now to leave ourselves space to change things.
374				let max_len = max_msg_size(msg_type) * 4;
375				if msg_len > max_len {
376					let err_msg = format!(
377						"Too large read {:?}, max_len: {}, msg_len: {}.",
378						msg_type, max_len, msg_len
379					);
380					error!("{}", err_msg);
381					return Err(ser::Error::TooLargeReadErr(err_msg));
382				}
383
384				Ok(MsgHeaderWrapper::Known(MsgHeader {
385					magic: m,
386					msg_type,
387					msg_len,
388				}))
389			}
390			None => {
391				// Unknown msg type, but we still want to limit how big the msg is.
392				let max_len = default_max_msg_size() * 4;
393				if msg_len > max_len {
394					let err_msg = format!(
395						"Too large read (unknown msg type) {:?}, max_len: {}, msg_len: {}.",
396						t, max_len, msg_len
397					);
398					error!("{}", err_msg);
399					return Err(ser::Error::TooLargeReadErr(err_msg));
400				}
401
402				Ok(MsgHeaderWrapper::Unknown(msg_len, t))
403			}
404		}
405	}
406}
407
408/// First part of a handshake, sender advertises its version and
409/// characteristics.
410pub struct Hand {
411	/// protocol version of the sender
412	pub version: ProtocolVersion,
413	/// capabilities of the sender
414	pub capabilities: Capabilities,
415	/// randomly generated for each handshake, helps detect self
416	pub nonce: u64,
417	/// genesis block of our chain, only connect to peers on the same chain
418	pub genesis: Hash,
419	/// total difficulty accumulated by the sender, used to check whether sync
420	/// may be needed
421	pub total_difficulty: Difficulty,
422	/// network address of the sender
423	pub sender_addr: PeerAddr,
424	/// network address of the receiver
425	pub receiver_addr: PeerAddr,
426	/// name of version of the software
427	pub user_agent: String,
428	/// base fee (For protocol version 4)
429	pub tx_fee_base: u64,
430}
431
432impl Writeable for Hand {
433	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
434		self.version.write(writer)?;
435		ser_multiwrite!(
436			writer,
437			[write_u32, self.capabilities.bits()],
438			[write_u64, self.nonce]
439		);
440		self.total_difficulty.write(writer)?;
441		self.sender_addr.write(writer)?;
442		self.receiver_addr.write(writer)?;
443		if self.user_agent.len() > 10_000 {
444			return Err(ser::Error::TooLargeWriteErr(format!(
445				"Unreasonable long User Agent. UA length is {}",
446				self.user_agent.len()
447			)));
448		}
449		writer.write_bytes(&self.user_agent)?;
450		self.genesis.write(writer)?;
451		if self.version.value() > 3 {
452			writer.write_u64(self.tx_fee_base)?;
453		}
454		Ok(())
455	}
456}
457
458impl Readable for Hand {
459	fn read<R: Reader>(reader: &mut R) -> Result<Hand, ser::Error> {
460		let version = ProtocolVersion::read(reader)?;
461		let (capab, nonce) = ser_multiread!(reader, read_u32, read_u64);
462		let capabilities = Capabilities::from_bits_truncate(capab);
463		let total_difficulty = Difficulty::read(reader)?;
464		let sender_addr = PeerAddr::read(reader)?;
465		let receiver_addr = PeerAddr::read(reader)?;
466		let ua = reader.read_bytes_len_prefix()?;
467		let user_agent = String::from_utf8(ua)
468			.map_err(|e| ser::Error::CorruptedData(format!("Fail to read User Agent, {}", e)))?;
469		let genesis = Hash::read(reader)?;
470		let tx_fee_base = if version.value() > 3 {
471			reader.read_u64()?
472		} else {
473			// Default base fee before we start lowering it.
474			consensus::MILLI_MWC
475		};
476		Ok(Hand {
477			version,
478			capabilities,
479			nonce,
480			genesis,
481			total_difficulty,
482			sender_addr,
483			receiver_addr,
484			user_agent,
485			tx_fee_base,
486		})
487	}
488}
489
490/// Second part of a handshake, receiver of the first part replies with its own
491/// version and characteristics.
492pub struct Shake {
493	/// sender version
494	pub version: ProtocolVersion,
495	/// sender capabilities
496	pub capabilities: Capabilities,
497	/// genesis block of our chain, only connect to peers on the same chain
498	pub genesis: Hash,
499	/// total difficulty accumulated by the sender, used to check whether sync
500	/// may be needed
501	pub total_difficulty: Difficulty,
502	/// name of version of the software
503	pub user_agent: String,
504	/// base fee (For protocol version 4)
505	pub tx_fee_base: u64,
506}
507
508impl Writeable for Shake {
509	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
510		writer.protocol_version().write(writer)?;
511		writer.write_u32(self.capabilities.bits())?;
512		self.total_difficulty.write(writer)?;
513		if self.user_agent.len() > 10_000 {
514			return Err(ser::Error::TooLargeWriteErr(format!(
515				"Unreasonable long User Agent. UA length is {}",
516				self.user_agent.len()
517			)));
518		}
519		writer.write_bytes(&self.user_agent)?;
520		self.genesis.write(writer)?;
521		if writer.protocol_version().value() > 3 {
522			writer.write_u64(self.tx_fee_base)?;
523		}
524		Ok(())
525	}
526}
527
528impl Readable for Shake {
529	fn read<R: Reader>(reader: &mut R) -> Result<Shake, ser::Error> {
530		let version = ProtocolVersion::read(reader)?;
531		let capab = reader.read_u32()?;
532		let capabilities = Capabilities::from_bits_truncate(capab);
533		let total_difficulty = Difficulty::read(reader)?;
534		let ua = reader.read_bytes_len_prefix()?;
535		let user_agent = String::from_utf8(ua)
536			.map_err(|e| ser::Error::CorruptedData(format!("Fail to read User Agent, {}", e)))?;
537		let genesis = Hash::read(reader)?;
538		let tx_fee_base = if version.value() > 3 {
539			reader.read_u64()?
540		} else {
541			// Default base fee before we start lowering it.
542			consensus::MILLI_MWC
543		};
544		Ok(Shake {
545			version,
546			capabilities,
547			genesis,
548			total_difficulty,
549			user_agent,
550			tx_fee_base,
551		})
552	}
553}
554
555/// Ask for other peers addresses, required for network discovery.
556#[derive(Debug)]
557pub struct GetPeerAddrs {
558	/// Filters on the capabilities we'd like the peers to have
559	pub capabilities: Capabilities,
560}
561
562impl Writeable for GetPeerAddrs {
563	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
564		writer.write_u32(self.capabilities.bits())
565	}
566}
567
568impl Readable for GetPeerAddrs {
569	fn read<R: Reader>(reader: &mut R) -> Result<GetPeerAddrs, ser::Error> {
570		let capab = reader.read_u32()?;
571		let capabilities = Capabilities::from_bits_truncate(capab);
572		Ok(GetPeerAddrs { capabilities })
573	}
574}
575
576/// Peer addresses we know of that are fresh enough, in response to
577/// GetPeerAddrs.
578#[derive(Debug, Clone, Serialize, PartialEq)]
579pub struct PeerAddrs {
580	pub peers: Vec<PeerAddr>,
581}
582
583impl Writeable for PeerAddrs {
584	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
585		if self.peers.len() > MAX_PEER_ADDRS as usize {
586			return Err(ser::Error::TooLargeWriteErr(
587				"peer.len larger then the limit".to_string(),
588			));
589		}
590		writer.write_u32(self.peers.len() as u32)?;
591		for p in &self.peers {
592			p.write(writer)?;
593		}
594		Ok(())
595	}
596}
597
598impl Readable for PeerAddrs {
599	fn read<R: Reader>(reader: &mut R) -> Result<PeerAddrs, ser::Error> {
600		let peer_count = reader.read_u32()?;
601		if peer_count > MAX_PEER_ADDRS {
602			return Err(ser::Error::TooLargeReadErr(
603				"peer_count larger then the limit".to_string(),
604			));
605		} else if peer_count == 0 {
606			return Ok(PeerAddrs { peers: vec![] });
607		}
608		let mut peers = Vec::with_capacity(peer_count as usize);
609		for _ in 0..peer_count {
610			peers.push(PeerAddr::read(reader)?);
611		}
612		Ok(PeerAddrs { peers })
613	}
614}
615
616impl IntoIterator for PeerAddrs {
617	type Item = PeerAddr;
618	type IntoIter = std::vec::IntoIter<Self::Item>;
619	fn into_iter(self) -> Self::IntoIter {
620		self.peers.into_iter()
621	}
622}
623
624impl Default for PeerAddrs {
625	fn default() -> Self {
626		PeerAddrs { peers: vec![] }
627	}
628}
629
630impl PeerAddrs {
631	pub fn as_slice(&self) -> &[PeerAddr] {
632		self.peers.as_slice()
633	}
634
635	pub fn contains(&self, addr: &PeerAddr) -> bool {
636		self.peers.contains(addr)
637	}
638
639	pub fn difference(&self, other: &[PeerAddr]) -> PeerAddrs {
640		let peers = self
641			.peers
642			.iter()
643			.filter(|x| !other.contains(x))
644			.cloned()
645			.collect();
646		PeerAddrs { peers }
647	}
648}
649
650/// We found some issue in the communication, sending an error back, usually
651/// followed by closing the connection.
652pub struct PeerError {
653	/// error code
654	pub code: u32,
655	/// slightly more user friendly message
656	pub message: String,
657}
658
659impl Writeable for PeerError {
660	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
661		if self.message.len() > 10_000 {
662			return Err(ser::Error::TooLargeWriteErr(format!(
663				"Unreasonable long PeerError message. length is {}",
664				self.message.len()
665			)));
666		}
667		ser_multiwrite!(writer, [write_u32, self.code], [write_bytes, &self.message]);
668		Ok(())
669	}
670}
671
672impl Readable for PeerError {
673	fn read<R: Reader>(reader: &mut R) -> Result<PeerError, ser::Error> {
674		let code = reader.read_u32()?;
675		let msg = reader.read_bytes_len_prefix()?;
676		let message = String::from_utf8(msg)
677			.map_err(|e| ser::Error::CorruptedData(format!("Fail to read message, {}", e)))?;
678		Ok(PeerError {
679			code: code,
680			message: message,
681		})
682	}
683}
684
685/// Serializable wrapper for the block locator.
686#[derive(Debug)]
687pub struct Locator {
688	pub hashes: Vec<Hash>,
689}
690
691impl Writeable for Locator {
692	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
693		if self.hashes.len() > MAX_LOCATORS as usize {
694			return Err(ser::Error::TooLargeWriteErr(format!(
695				"Storing too many locators: {}",
696				self.hashes.len()
697			)));
698		}
699		writer.write_u8(self.hashes.len() as u8)?;
700		for h in &self.hashes {
701			h.write(writer)?
702		}
703		Ok(())
704	}
705}
706
707impl Readable for Locator {
708	fn read<R: Reader>(reader: &mut R) -> Result<Locator, ser::Error> {
709		let len = reader.read_u8()?;
710		if len > (MAX_LOCATORS as u8) {
711			return Err(ser::Error::TooLargeReadErr(format!(
712				"Get too many locators: {}",
713				len
714			)));
715		}
716		let mut hashes = Vec::with_capacity(len as usize);
717		for _ in 0..len {
718			hashes.push(Hash::read(reader)?);
719		}
720		Ok(Locator { hashes: hashes })
721	}
722}
723
724/// Serializable wrapper for a list of block headers.
725pub struct Headers {
726	pub headers: Vec<BlockHeader>,
727}
728
729impl Writeable for Headers {
730	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
731		writer.write_u16(self.headers.len() as u16)?;
732		for h in &self.headers {
733			h.write(writer)?
734		}
735		Ok(())
736	}
737}
738
739#[derive(Debug)]
740pub struct Ping {
741	/// total difficulty accumulated by the sender, used to check whether sync
742	/// may be needed
743	pub total_difficulty: Difficulty,
744	/// total height
745	pub height: u64,
746}
747
748impl Writeable for Ping {
749	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
750		self.total_difficulty.write(writer)?;
751		self.height.write(writer)?;
752		Ok(())
753	}
754}
755
756impl Readable for Ping {
757	fn read<R: Reader>(reader: &mut R) -> Result<Ping, ser::Error> {
758		let total_difficulty = Difficulty::read(reader)?;
759		let height = reader.read_u64()?;
760		Ok(Ping {
761			total_difficulty,
762			height,
763		})
764	}
765}
766
767#[derive(Debug)]
768pub struct Pong {
769	/// total difficulty accumulated by the sender, used to check whether sync
770	/// may be needed
771	pub total_difficulty: Difficulty,
772	/// height accumulated by sender
773	pub height: u64,
774}
775
776impl Writeable for Pong {
777	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
778		self.total_difficulty.write(writer)?;
779		self.height.write(writer)?;
780		Ok(())
781	}
782}
783
784impl Readable for Pong {
785	fn read<R: Reader>(reader: &mut R) -> Result<Pong, ser::Error> {
786		let total_difficulty = Difficulty::read(reader)?;
787		let height = reader.read_u64()?;
788		Ok(Pong {
789			total_difficulty,
790			height,
791		})
792	}
793}
794
795#[derive(Debug)]
796pub struct BanReason {
797	/// the reason for the ban
798	pub ban_reason: ReasonForBan,
799}
800
801impl Writeable for BanReason {
802	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
803		let ban_reason_i32 = self.ban_reason as i32;
804		ban_reason_i32.write(writer)?;
805		Ok(())
806	}
807}
808
809impl Readable for BanReason {
810	fn read<R: Reader>(reader: &mut R) -> Result<BanReason, ser::Error> {
811		let ban_reason_i32 = match reader.read_i32() {
812			Ok(h) => h,
813			Err(_) => 0,
814		};
815
816		let ban_reason = ReasonForBan::from_i32(ban_reason_i32).ok_or(
817			ser::Error::CorruptedData("Fail to read ban reason".to_string()),
818		)?;
819
820		Ok(BanReason { ban_reason })
821	}
822}
823
824#[derive(Debug)]
825pub struct HashHeadersData {
826	/// Height of the archive block to what we are expecting to get headers hashes
827	pub archive_height: u64,
828}
829
830impl Writeable for HashHeadersData {
831	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
832		writer.write_u64(self.archive_height)?;
833		Ok(())
834	}
835}
836
837impl Readable for HashHeadersData {
838	fn read<R: Reader>(reader: &mut R) -> Result<HashHeadersData, ser::Error> {
839		Ok(HashHeadersData {
840			archive_height: reader.read_u64()?,
841		})
842	}
843}
844
845#[derive(Debug)]
846pub struct StartHeadersHashResponse {
847	pub archive_height: u64,
848	pub headers_root_hash: Hash,
849}
850
851impl Writeable for StartHeadersHashResponse {
852	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
853		writer.write_u64(self.archive_height)?;
854		self.headers_root_hash.write(writer)?;
855		Ok(())
856	}
857}
858
859impl Readable for StartHeadersHashResponse {
860	fn read<R: Reader>(reader: &mut R) -> Result<StartHeadersHashResponse, ser::Error> {
861		Ok(StartHeadersHashResponse {
862			archive_height: reader.read_u64()?,
863			headers_root_hash: Hash::read(reader)?,
864		})
865	}
866}
867
868pub struct HeadersHashSegmentResponse {
869	/// The hash of the block the MMR is associated with
870	pub headers_root_hash: Hash,
871	/// The segment response
872	pub response: SegmentResponse<Hash>,
873}
874
875impl Readable for HeadersHashSegmentResponse {
876	fn read<R: Reader>(reader: &mut R) -> Result<Self, ser::Error> {
877		let headers_root_hash = Readable::read(reader)?;
878		let response = Readable::read(reader)?;
879		Ok(Self {
880			headers_root_hash,
881			response,
882		})
883	}
884}
885
886impl Writeable for HeadersHashSegmentResponse {
887	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
888		Writeable::write(&self.headers_root_hash, writer)?;
889		Writeable::write(&self.response, writer)
890	}
891}
892
893/// Request to get PIBD sync request
894#[derive(Debug)]
895pub struct ArchiveHeaderData {
896	/// Hash of the block for which the txhashset should be provided
897	pub hash: Hash,
898	/// Height of the corresponding block
899	pub height: u64,
900}
901
902impl Writeable for ArchiveHeaderData {
903	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
904		self.hash.write(writer)?;
905		writer.write_u64(self.height)?;
906		Ok(())
907	}
908}
909
910impl Readable for ArchiveHeaderData {
911	fn read<R: Reader>(reader: &mut R) -> Result<ArchiveHeaderData, ser::Error> {
912		Ok(ArchiveHeaderData {
913			hash: Hash::read(reader)?,
914			height: reader.read_u64()?,
915		})
916	}
917}
918
919#[derive(Debug)]
920pub struct PibdSyncState {
921	/// Hash of the block for which the txhashset should be provided
922	pub header_hash: Hash,
923	/// Height of the corresponding block
924	pub header_height: u64,
925	/// output bitmap root hash
926	pub output_bitmap_root: Hash,
927}
928
929impl Writeable for PibdSyncState {
930	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
931		self.header_hash.write(writer)?;
932		writer.write_u64(self.header_height)?;
933		self.output_bitmap_root.write(writer)?;
934		Ok(())
935	}
936}
937
938impl Readable for PibdSyncState {
939	fn read<R: Reader>(reader: &mut R) -> Result<PibdSyncState, ser::Error> {
940		Ok(PibdSyncState {
941			header_hash: Hash::read(reader)?,
942			header_height: reader.read_u64()?,
943			output_bitmap_root: Hash::read(reader)?,
944		})
945	}
946}
947
948/// Request to get a segment of a (P)MMR at a particular block.
949#[derive(Debug)]
950pub struct SegmentRequest {
951	/// The hash of the block the MMR is associated with
952	pub block_hash: Hash,
953	/// The identifier of the requested segment
954	pub identifier: SegmentIdentifier,
955}
956
957impl Readable for SegmentRequest {
958	fn read<R: Reader>(reader: &mut R) -> Result<Self, ser::Error> {
959		let block_hash = Readable::read(reader)?;
960		let identifier = Readable::read(reader)?;
961		Ok(Self {
962			block_hash,
963			identifier,
964		})
965	}
966}
967
968impl Writeable for SegmentRequest {
969	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
970		Writeable::write(&self.block_hash, writer)?;
971		Writeable::write(&self.identifier, writer)
972	}
973}
974
975/// Response to a (P)MMR segment request.
976pub struct SegmentResponse<T> {
977	/// The hash of the archive header - block the MMR is associated with
978	pub block_hash: Hash,
979	/// The MMR segment
980	pub segment: Segment<T>,
981}
982
983impl<T: Readable> Readable for SegmentResponse<T> {
984	fn read<R: Reader>(reader: &mut R) -> Result<Self, ser::Error> {
985		let block_hash = Readable::read(reader)?;
986		let segment = Readable::read(reader)?;
987		Ok(Self {
988			block_hash,
989			segment,
990		})
991	}
992}
993
994impl<T: Writeable> Writeable for SegmentResponse<T> {
995	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
996		Writeable::write(&self.block_hash, writer)?;
997		Writeable::write(&self.segment, writer)
998	}
999}
1000
1001/// Response to an output PMMR segment request.
1002pub struct OutputSegmentResponse {
1003	/// The segment response
1004	pub response: SegmentResponse<OutputIdentifier>,
1005}
1006
1007impl Readable for OutputSegmentResponse {
1008	fn read<R: Reader>(reader: &mut R) -> Result<Self, ser::Error> {
1009		let response = Readable::read(reader)?;
1010		Ok(Self { response })
1011	}
1012}
1013
1014impl Writeable for OutputSegmentResponse {
1015	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
1016		Writeable::write(&self.response, writer)
1017	}
1018}
1019
1020/// Response to an output bitmap MMR segment request.
1021pub struct OutputBitmapSegmentResponse {
1022	/// The hash of the block the MMR is associated with
1023	pub block_hash: Hash,
1024	/// The MMR segment
1025	pub segment: BitmapSegment,
1026}
1027
1028impl Readable for OutputBitmapSegmentResponse {
1029	fn read<R: Reader>(reader: &mut R) -> Result<Self, ser::Error> {
1030		let block_hash = Readable::read(reader)?;
1031		let segment = Readable::read(reader)?;
1032		Ok(Self {
1033			block_hash,
1034			segment,
1035		})
1036	}
1037}
1038
1039impl Writeable for OutputBitmapSegmentResponse {
1040	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
1041		Writeable::write(&self.block_hash, writer)?;
1042		Writeable::write(&self.segment, writer)
1043	}
1044}
1045
1046pub enum Message {
1047	Unknown(u8),
1048	Ping(Ping),
1049	Pong(Pong),
1050	BanReason(BanReason),
1051	TransactionKernel(Hash),
1052	GetTransaction(Hash),
1053	Transaction(Transaction),
1054	StemTransaction(Transaction),
1055	GetBlock(Hash),
1056	Block(UntrustedBlock),
1057	GetCompactBlock(Hash),
1058	CompactBlock(UntrustedCompactBlock),
1059	GetHeaders(Locator),
1060	Header(UntrustedBlockHeader),
1061	Headers(HeadersData),
1062	GetPeerAddrs(GetPeerAddrs),
1063	PeerAddrs(PeerAddrs),
1064	TxHashSetRequest(ArchiveHeaderData),
1065	TxHashSetArchive(TxHashSetArchive),
1066	Attachment(AttachmentUpdate, Option<Bytes>),
1067	TorAddress(TorAddress),
1068	StartHeadersHashRequest(HashHeadersData),
1069	StartHeadersHashResponse(StartHeadersHashResponse),
1070	GetHeadersHashesSegment(SegmentRequest),
1071	OutputHeadersHashesSegment(HeadersHashSegmentResponse),
1072	StartPibdSyncRequest(ArchiveHeaderData),
1073	PibdSyncState(PibdSyncState),
1074	GetOutputBitmapSegment(SegmentRequest),
1075	OutputBitmapSegment(OutputBitmapSegmentResponse),
1076	GetOutputSegment(SegmentRequest),
1077	OutputSegment(OutputSegmentResponse),
1078	GetRangeProofSegment(SegmentRequest),
1079	RangeProofSegment(SegmentResponse<RangeProof>),
1080	GetKernelSegment(SegmentRequest),
1081	KernelSegment(SegmentResponse<TxKernel>),
1082	HasAnotherArchiveHeader(ArchiveHeaderData),
1083}
1084
1085/// We receive 512 headers from a peer.
1086/// But we process them in smaller batches of 32 headers.
1087/// HeadersData wraps the current batch and a count of the headers remaining after this batch.
1088pub struct HeadersData {
1089	/// Batch of headers currently being processed.
1090	pub headers: Vec<BlockHeader>,
1091	/// Number of headers stil to be processed after this current batch.
1092	/// 0 indicates this is the final batch from the larger set of headers received from the peer.
1093	pub remaining: u64,
1094}
1095
1096impl fmt::Display for Message {
1097	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1098		match self {
1099			Message::Unknown(i) => write!(f, "Unknown({})", i),
1100			Message::Ping(ping) => write!(f, "{:?}", ping),
1101			Message::Pong(pong) => write!(f, "{:?}", pong),
1102			Message::BanReason(ban_reason) => write!(f, "{:?}", ban_reason),
1103			Message::TransactionKernel(hash) => write!(f, "TransactionKernel({})", hash),
1104			Message::GetTransaction(hash) => write!(f, "GetTransaction({})", hash),
1105			Message::Transaction(tx) => write!(f, "{:?}", tx),
1106			Message::StemTransaction(tx) => write!(f, "STEM[{:?}]", tx),
1107			Message::GetBlock(hash) => write!(f, "GetBlock({})", hash),
1108			Message::Block(block) => write!(f, "{:?}", block),
1109			Message::GetCompactBlock(hash) => write!(f, "GetCompactBlock({})", hash),
1110			Message::CompactBlock(com_block) => write!(f, "{:?}", com_block),
1111			Message::GetHeaders(loc) => write!(f, "GetHeaders({:?})", loc),
1112			Message::Header(header) => write!(f, "Header({:?})", header),
1113			Message::Headers(headers) => match headers.headers.first() {
1114				Some(header) => write!(
1115					f,
1116					"Headers(H:{} Num:{}, Rem:{})",
1117					header.height,
1118					headers.headers.len(),
1119					headers.remaining
1120				),
1121				None => write!(f, "Headers(EMPTY)"),
1122			},
1123			Message::GetPeerAddrs(peer_addr) => write!(f, "{:?}", peer_addr),
1124			Message::PeerAddrs(peer_addrs) => write!(f, "{:?}", peer_addrs),
1125			Message::TxHashSetRequest(arch) => write!(f, "TxHashSetRequest({:?})", arch),
1126			Message::TxHashSetArchive(hash_set) => write!(f, "{:?}", hash_set),
1127			Message::Attachment(meta, _) => write!(f, "Attachment({:?})", meta),
1128			Message::TorAddress(addr) => write!(f, "{:?}", addr),
1129			Message::StartHeadersHashRequest(req) => {
1130				write!(f, "StartHeadersHashRequest({:?})", req)
1131			}
1132			Message::StartHeadersHashResponse(resp) => {
1133				write!(f, "StartHeadersHashResponse({:?})", resp)
1134			}
1135			Message::GetHeadersHashesSegment(seg_req) => {
1136				write!(f, "GetHeadersHashesSegment({:?})", seg_req)
1137			}
1138			Message::OutputHeadersHashesSegment(segm) => write!(
1139				f,
1140				"OutputHeadersHashesSegment({:?}, root:{})",
1141				segm.response.segment.id(),
1142				segm.headers_root_hash
1143			),
1144			Message::GetOutputBitmapSegment(segm) => {
1145				write!(f, "GetOutputBitmapSegment({:?})", segm)
1146			}
1147			Message::OutputBitmapSegment(segm) => write!(
1148				f,
1149				"OutputBitmapSegment({:?}, root:{})",
1150				segm.segment.identifier, segm.block_hash
1151			),
1152			Message::GetOutputSegment(segm) => write!(f, "GetOutputSegment({:?})", segm),
1153			Message::OutputSegment(segm) => write!(
1154				f,
1155				"OutputSegment({:?}, root:{})",
1156				segm.response.segment.id(),
1157				segm.response.block_hash
1158			),
1159			Message::GetRangeProofSegment(segm) => write!(f, "GetRangeProofSegment({:?})", segm),
1160			Message::RangeProofSegment(segm) => write!(
1161				f,
1162				"RangeProofSegment({:?}, root:{})",
1163				segm.segment.id(),
1164				segm.block_hash
1165			),
1166			Message::GetKernelSegment(segm) => write!(f, "GetKernelSegment({:?})", segm),
1167			Message::KernelSegment(segm) => write!(
1168				f,
1169				"KernelSegment({:?}, root:{})",
1170				segm.segment.id(),
1171				segm.block_hash
1172			),
1173			Message::PibdSyncState(state) => write!(f, "{:?}", state),
1174			Message::StartPibdSyncRequest(dt) => write!(f, "StartPibdSyncRequest({:?})", dt),
1175			Message::HasAnotherArchiveHeader(dt) => write!(f, "HasAnotherArchiveHeader({:?})", dt),
1176		}
1177	}
1178}
1179
1180impl fmt::Debug for Message {
1181	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1182		write!(f, "Consume({})", self)
1183	}
1184}
1185
1186pub enum Consumed {
1187	Response(Msg),
1188	Attachment(Arc<AttachmentMeta>, File),
1189	None,
1190	Disconnect,
1191}
1192
1193impl fmt::Debug for Consumed {
1194	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1195		match self {
1196			Consumed::Response(msg) => write!(f, "Consumed::Response({:?})", msg.header.msg_type),
1197			Consumed::Attachment(meta, _) => write!(f, "Consumed::Attachment({:?})", meta.size),
1198			Consumed::None => write!(f, "Consumed::None"),
1199			Consumed::Disconnect => write!(f, "Consumed::Disconnect"),
1200		}
1201	}
1202}
1203
1204/// Response to a txhashset archive request, must include a zip stream of the
1205/// archive after the message body.
1206#[derive(Debug)]
1207pub struct TxHashSetArchive {
1208	/// Hash of the block for which the txhashset are provided
1209	pub hash: Hash,
1210	/// Height of the corresponding block
1211	pub height: u64,
1212	/// Size in bytes of the archive
1213	pub bytes: u64,
1214}
1215
1216impl Writeable for TxHashSetArchive {
1217	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
1218		self.hash.write(writer)?;
1219		ser_multiwrite!(writer, [write_u64, self.height], [write_u64, self.bytes]);
1220		Ok(())
1221	}
1222}
1223
1224impl Readable for TxHashSetArchive {
1225	fn read<R: Reader>(reader: &mut R) -> Result<TxHashSetArchive, ser::Error> {
1226		let hash = Hash::read(reader)?;
1227		let (height, bytes) = ser_multiread!(reader, read_u64, read_u64);
1228
1229		Ok(TxHashSetArchive {
1230			hash,
1231			height,
1232			bytes,
1233		})
1234	}
1235}
1236
1237#[derive(Debug)]
1238pub struct TorAddress {
1239	pub address: String,
1240}
1241
1242impl TorAddress {
1243	/// Creates a new message TorAddress.
1244	pub fn new(address: String) -> TorAddress {
1245		TorAddress { address }
1246	}
1247}
1248
1249impl Writeable for TorAddress {
1250	fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
1251		ser_multiwrite!(writer, [write_bytes, &self.address]);
1252		Ok(())
1253	}
1254}
1255
1256impl Readable for TorAddress {
1257	fn read<R: Reader>(reader: &mut R) -> Result<TorAddress, ser::Error> {
1258		let address = String::from_utf8(reader.read_bytes_len_prefix()?);
1259
1260		match address {
1261			Ok(address) => Ok(TorAddress { address }),
1262			Err(e) => Err(ser::Error::Utf8Conversion(e.to_string())),
1263		}
1264	}
1265}