1use crate::error::GatewayError;
28
29const ETF_VERSION: u8 = 131;
31
32mod tags {
34 pub const SMALL_INTEGER: u8 = 97;
35 pub const INTEGER: u8 = 98;
36 pub const FLOAT: u8 = 99;
37 pub const ATOM: u8 = 100;
38 pub const SMALL_TUPLE: u8 = 104;
39 pub const LARGE_TUPLE: u8 = 105;
40 pub const NIL: u8 = 106;
41 pub const STRING: u8 = 107;
42 pub const LIST: u8 = 108;
43 pub const BINARY: u8 = 109;
44 pub const SMALL_BIG: u8 = 110;
45 pub const LARGE_BIG: u8 = 111;
46 pub const MAP: u8 = 116;
47 pub const ATOM_UTF8: u8 = 118;
48 pub const SMALL_ATOM_UTF8: u8 = 119;
49 pub const NEW_FLOAT: u8 = 70;
50 pub const COMPRESSED: u8 = 80;
51}
52
53#[derive(Debug, Clone, PartialEq)]
55pub enum EtfTerm {
56 SmallInt(u8),
58 Int(i32),
60 BigInt(i128),
62 Float(f64),
64 Atom(String),
66 Tuple(Vec<EtfTerm>),
68 Nil,
70 String(String),
72 List(Vec<EtfTerm>),
74 Binary(Vec<u8>),
76 Map(Vec<(EtfTerm, EtfTerm)>),
78}
79
80pub struct EtfDecoder<'a> {
82 data: &'a [u8],
83 pos: usize,
84}
85
86impl<'a> EtfDecoder<'a> {
87 pub fn new(data: &'a [u8]) -> Self {
89 Self { data, pos: 0 }
90 }
91
92 pub fn decode(data: &[u8]) -> Result<EtfTerm, GatewayError> {
94 let mut decoder = EtfDecoder::new(data);
95 decoder.decode_term()
96 }
97
98 fn decode_term(&mut self) -> Result<EtfTerm, GatewayError> {
100 let version = self.read_u8()?;
102 if version != ETF_VERSION {
103 return Err(GatewayError::JsonDecode(format!(
104 "Invalid ETF version: expected {}, got {}",
105 ETF_VERSION, version
106 )));
107 }
108
109 self.decode_value()
110 }
111
112 fn decode_value(&mut self) -> Result<EtfTerm, GatewayError> {
114 let tag = self.read_u8()?;
115
116 match tag {
117 tags::SMALL_INTEGER => {
118 let value = self.read_u8()?;
119 Ok(EtfTerm::SmallInt(value))
120 }
121
122 tags::INTEGER => {
123 let value = self.read_i32()?;
124 Ok(EtfTerm::Int(value))
125 }
126
127 tags::FLOAT => {
128 let bytes = self.read_bytes(31)?;
130 let s = std::str::from_utf8(bytes)
131 .map_err(|e| GatewayError::JsonDecode(format!("Invalid float string: {}", e)))?
132 .trim_end_matches('\0');
133 let value: f64 = s
134 .parse()
135 .map_err(|e| GatewayError::JsonDecode(format!("Invalid float: {}", e)))?;
136 Ok(EtfTerm::Float(value))
137 }
138
139 tags::NEW_FLOAT => {
140 let bytes = self.read_bytes(8)?;
141 let value =
142 f64::from_be_bytes(bytes.try_into().map_err(|_| {
143 GatewayError::JsonDecode("Invalid float bytes".to_string())
144 })?);
145 Ok(EtfTerm::Float(value))
146 }
147
148 tags::ATOM => {
149 let len = self.read_u16()? as usize;
150 let bytes = self.read_bytes(len)?;
151 let s = String::from_utf8_lossy(bytes).into_owned();
152 Ok(EtfTerm::Atom(s))
153 }
154
155 tags::ATOM_UTF8 => {
156 let len = self.read_u16()? as usize;
157 let bytes = self.read_bytes(len)?;
158 let s = String::from_utf8_lossy(bytes).into_owned();
159 Ok(EtfTerm::Atom(s))
160 }
161
162 tags::SMALL_ATOM_UTF8 => {
163 let len = self.read_u8()? as usize;
164 let bytes = self.read_bytes(len)?;
165 let s = String::from_utf8_lossy(bytes).into_owned();
166 Ok(EtfTerm::Atom(s))
167 }
168
169 tags::SMALL_TUPLE => {
170 let arity = self.read_u8()? as usize;
171 let mut elements = Vec::with_capacity(arity);
172 for _ in 0..arity {
173 elements.push(self.decode_value()?);
174 }
175 Ok(EtfTerm::Tuple(elements))
176 }
177
178 tags::LARGE_TUPLE => {
179 let arity = self.read_u32()? as usize;
180 let mut elements = Vec::with_capacity(arity);
181 for _ in 0..arity {
182 elements.push(self.decode_value()?);
183 }
184 Ok(EtfTerm::Tuple(elements))
185 }
186
187 tags::NIL => Ok(EtfTerm::Nil),
188
189 tags::STRING => {
190 let len = self.read_u16()? as usize;
192 let bytes = self.read_bytes(len)?;
193 let s = String::from_utf8_lossy(bytes).into_owned();
194 Ok(EtfTerm::String(s))
195 }
196
197 tags::LIST => {
198 let len = self.read_u32()? as usize;
199 let mut elements = Vec::with_capacity(len);
200 for _ in 0..len {
201 elements.push(self.decode_value()?);
202 }
203 let _tail = self.decode_value()?;
205 Ok(EtfTerm::List(elements))
206 }
207
208 tags::BINARY => {
209 let len = self.read_u32()? as usize;
210 let bytes = self.read_bytes(len)?;
211 Ok(EtfTerm::Binary(bytes.to_vec()))
212 }
213
214 tags::SMALL_BIG => {
215 let n = self.read_u8()? as usize;
216 let sign = self.read_u8()?;
217 let bytes = self.read_bytes(n)?;
218
219 let mut value: i128 = 0;
220 for (i, &byte) in bytes.iter().enumerate() {
221 value |= (byte as i128) << (i * 8);
222 }
223
224 if sign != 0 {
225 value = -value;
226 }
227
228 Ok(EtfTerm::BigInt(value))
229 }
230
231 tags::LARGE_BIG => {
232 let n = self.read_u32()? as usize;
233 let sign = self.read_u8()?;
234 let bytes = self.read_bytes(n)?;
235
236 let mut value: i128 = 0;
238 for (i, &byte) in bytes.iter().take(16).enumerate() {
239 value |= (byte as i128) << (i * 8);
240 }
241
242 if sign != 0 {
243 value = -value;
244 }
245
246 Ok(EtfTerm::BigInt(value))
247 }
248
249 tags::MAP => {
250 let arity = self.read_u32()? as usize;
251 let mut pairs = Vec::with_capacity(arity);
252 for _ in 0..arity {
253 let key = self.decode_value()?;
254 let value = self.decode_value()?;
255 pairs.push((key, value));
256 }
257 Ok(EtfTerm::Map(pairs))
258 }
259
260 tags::COMPRESSED => {
261 let uncompressed_size = self.read_u32()? as usize;
262 let compressed_data = &self.data[self.pos..];
263
264 use flate2::read::ZlibDecoder;
266 use std::io::Read;
267
268 let mut decoder = ZlibDecoder::new(compressed_data);
269 let mut decompressed = Vec::with_capacity(uncompressed_size);
270 decoder.read_to_end(&mut decompressed).map_err(|e| {
271 GatewayError::JsonDecode(format!("ETF decompression failed: {}", e))
272 })?;
273
274 self.pos = self.data.len();
276
277 let mut inner = EtfDecoder::new(&decompressed);
279 inner.decode_value()
280 }
281
282 _ => Err(GatewayError::JsonDecode(format!(
283 "Unknown ETF tag: {} at position {}",
284 tag,
285 self.pos - 1
286 ))),
287 }
288 }
289
290 #[inline]
292 fn read_u8(&mut self) -> Result<u8, GatewayError> {
293 if self.pos >= self.data.len() {
294 return Err(GatewayError::JsonDecode(
295 "Unexpected end of ETF data".to_string(),
296 ));
297 }
298 let byte = self.data[self.pos];
299 self.pos += 1;
300 Ok(byte)
301 }
302
303 #[inline]
305 fn read_u16(&mut self) -> Result<u16, GatewayError> {
306 if self.pos + 2 > self.data.len() {
307 return Err(GatewayError::JsonDecode(
308 "Unexpected end of ETF data".to_string(),
309 ));
310 }
311 let value = u16::from_be_bytes([self.data[self.pos], self.data[self.pos + 1]]);
312 self.pos += 2;
313 Ok(value)
314 }
315
316 #[inline]
318 fn read_u32(&mut self) -> Result<u32, GatewayError> {
319 if self.pos + 4 > self.data.len() {
320 return Err(GatewayError::JsonDecode(
321 "Unexpected end of ETF data".to_string(),
322 ));
323 }
324 let value = u32::from_be_bytes([
325 self.data[self.pos],
326 self.data[self.pos + 1],
327 self.data[self.pos + 2],
328 self.data[self.pos + 3],
329 ]);
330 self.pos += 4;
331 Ok(value)
332 }
333
334 #[inline]
336 fn read_i32(&mut self) -> Result<i32, GatewayError> {
337 if self.pos + 4 > self.data.len() {
338 return Err(GatewayError::JsonDecode(
339 "Unexpected end of ETF data".to_string(),
340 ));
341 }
342 let value = i32::from_be_bytes([
343 self.data[self.pos],
344 self.data[self.pos + 1],
345 self.data[self.pos + 2],
346 self.data[self.pos + 3],
347 ]);
348 self.pos += 4;
349 Ok(value)
350 }
351
352 fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], GatewayError> {
354 if self.pos + n > self.data.len() {
355 return Err(GatewayError::JsonDecode(
356 "Unexpected end of ETF data".to_string(),
357 ));
358 }
359 let bytes = &self.data[self.pos..self.pos + n];
360 self.pos += n;
361 Ok(bytes)
362 }
363
364 pub fn to_json_value(term: &EtfTerm) -> Result<serde_json::Value, GatewayError> {
369 match term {
370 EtfTerm::SmallInt(n) => Ok(serde_json::Value::Number((*n as i64).into())),
371 EtfTerm::Int(n) => Ok(serde_json::Value::Number((*n as i64).into())),
372 EtfTerm::BigInt(n) => {
373 if *n > i64::MAX as i128 || *n < i64::MIN as i128 {
376 Ok(serde_json::Value::String(n.to_string()))
377 } else {
378 Ok(serde_json::Value::Number((*n as i64).into()))
379 }
380 }
381 EtfTerm::Float(f) => serde_json::Number::from_f64(*f)
382 .map(serde_json::Value::Number)
383 .ok_or_else(|| GatewayError::JsonDecode("Invalid float value".to_string())),
384 EtfTerm::Atom(s) => {
385 match s.as_str() {
387 "nil" | "null" => Ok(serde_json::Value::Null),
388 "true" => Ok(serde_json::Value::Bool(true)),
389 "false" => Ok(serde_json::Value::Bool(false)),
390 _ => Ok(serde_json::Value::String(s.clone())),
391 }
392 }
393 EtfTerm::Tuple(elements) => {
394 let arr: Result<Vec<_>, _> = elements.iter().map(Self::to_json_value).collect();
395 Ok(serde_json::Value::Array(arr?))
396 }
397 EtfTerm::Nil => Ok(serde_json::Value::Null),
398 EtfTerm::String(s) => Ok(serde_json::Value::String(s.clone())),
399 EtfTerm::List(elements) => {
400 let arr: Result<Vec<_>, _> = elements.iter().map(Self::to_json_value).collect();
401 Ok(serde_json::Value::Array(arr?))
402 }
403 EtfTerm::Binary(bytes) => {
404 match String::from_utf8(bytes.clone()) {
406 Ok(s) => Ok(serde_json::Value::String(s)),
407 Err(_) => {
408 use base64::Engine;
410 let encoded = base64::engine::general_purpose::STANDARD.encode(bytes);
411 Ok(serde_json::Value::String(encoded))
412 }
413 }
414 }
415 EtfTerm::Map(pairs) => {
416 let mut map = serde_json::Map::new();
417 for (key, value) in pairs {
418 let key_str = match key {
419 EtfTerm::Atom(s) => s.clone(),
420 EtfTerm::Binary(b) => String::from_utf8_lossy(b).into_owned(),
421 EtfTerm::String(s) => s.clone(),
422 _ => {
423 let json_key = Self::to_json_value(key)?;
425 json_key.to_string()
426 }
427 };
428 map.insert(key_str, Self::to_json_value(value)?);
429 }
430 Ok(serde_json::Value::Object(map))
431 }
432 }
433 }
434
435 pub fn to_json_string(term: &EtfTerm) -> Result<String, GatewayError> {
437 let value = Self::to_json_value(term)?;
438 serde_json::to_string(&value).map_err(GatewayError::from)
439 }
440}
441
442#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
444pub enum GatewayEncoding {
445 #[default]
447 Json,
448 Etf,
450}
451
452impl GatewayEncoding {
453 pub fn as_str(&self) -> &'static str {
455 match self {
456 GatewayEncoding::Json => "json",
457 GatewayEncoding::Etf => "etf",
458 }
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465
466 #[test]
467 fn test_decode_small_int() {
468 let data = [131, 97, 42];
470 let term = EtfDecoder::decode(&data).unwrap();
471 assert_eq!(term, EtfTerm::SmallInt(42));
472 }
473
474 #[test]
475 fn test_decode_integer() {
476 let data = [131, 98, 0, 0, 1, 0]; let term = EtfDecoder::decode(&data).unwrap();
479 assert_eq!(term, EtfTerm::Int(256));
480 }
481
482 #[test]
483 fn test_decode_nil() {
484 let data = [131, 106];
485 let term = EtfDecoder::decode(&data).unwrap();
486 assert_eq!(term, EtfTerm::Nil);
487 }
488
489 #[test]
490 fn test_decode_binary() {
491 let data = [131, 109, 0, 0, 0, 5, b'h', b'e', b'l', b'l', b'o'];
493 let term = EtfDecoder::decode(&data).unwrap();
494 assert_eq!(term, EtfTerm::Binary(b"hello".to_vec()));
495 }
496
497 #[test]
498 fn test_decode_small_atom_utf8() {
499 let data = [131, 119, 4, b't', b'e', b's', b't'];
501 let term = EtfDecoder::decode(&data).unwrap();
502 assert_eq!(term, EtfTerm::Atom("test".to_string()));
503 }
504
505 #[test]
506 fn test_decode_map() {
507 let data = [
510 131, 116, 0, 0, 0, 1, 119, 1, b'a', 97, 1, ];
516 let term = EtfDecoder::decode(&data).unwrap();
517
518 if let EtfTerm::Map(pairs) = term {
519 assert_eq!(pairs.len(), 1);
520 assert_eq!(pairs[0].0, EtfTerm::Atom("a".to_string()));
521 assert_eq!(pairs[0].1, EtfTerm::SmallInt(1));
522 } else {
523 panic!("Expected Map");
524 }
525 }
526
527 #[test]
528 fn test_to_json_value() {
529 let term = EtfTerm::Map(vec![
530 (EtfTerm::Atom("op".to_string()), EtfTerm::SmallInt(10)),
531 (
532 EtfTerm::Atom("d".to_string()),
533 EtfTerm::Map(vec![(
534 EtfTerm::Atom("heartbeat_interval".to_string()),
535 EtfTerm::Int(41250),
536 )]),
537 ),
538 ]);
539
540 let json = EtfDecoder::to_json_value(&term).unwrap();
541 assert_eq!(json["op"], 10);
542 assert_eq!(json["d"]["heartbeat_interval"], 41250);
543 }
544
545 #[test]
546 fn test_atom_special_values() {
547 let term = EtfTerm::Atom("nil".to_string());
549 let json = EtfDecoder::to_json_value(&term).unwrap();
550 assert!(json.is_null());
551
552 let term = EtfTerm::Atom("true".to_string());
554 let json = EtfDecoder::to_json_value(&term).unwrap();
555 assert_eq!(json, serde_json::Value::Bool(true));
556
557 let term = EtfTerm::Atom("false".to_string());
559 let json = EtfDecoder::to_json_value(&term).unwrap();
560 assert_eq!(json, serde_json::Value::Bool(false));
561 }
562}