1use std::{
2 fmt::{Display, Write},
3 ops::Deref,
4};
5
6use bytes::Bytes;
7
8use either::Either;
9use itertools::Itertools;
10use serde::{Deserialize, Serialize};
11use serde_repr::{Deserialize_repr, Serialize_repr};
12
13use crate::{
14 common::{Field, Ty},
15 helpers::CompressOptions,
16 util::Inlinable,
17};
18
19use super::RawData;
20
21#[derive(Debug, Clone)]
22pub struct RawMeta(RawData);
23
24impl<T: Into<RawData>> From<T> for RawMeta {
25 fn from(bytes: T) -> Self {
26 RawMeta(bytes.into())
27 }
28}
29
30impl Deref for RawMeta {
31 type Target = RawData;
32
33 fn deref(&self) -> &Self::Target {
34 &self.0
35 }
36}
37
38impl RawMeta {
39 pub fn new(raw: Bytes) -> Self {
40 RawMeta(raw.into())
41 }
42}
43
44impl Inlinable for RawMeta {
45 fn read_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
46 RawData::read_inlined(reader).map(RawMeta)
47 }
48
49 fn write_inlined<W: std::io::Write>(&self, wtr: &mut W) -> std::io::Result<usize> {
50 self.deref().write_inlined(wtr)
51 }
52}
53
54#[async_trait::async_trait]
55impl crate::util::AsyncInlinable for RawMeta {
56 async fn read_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
57 reader: &mut R,
58 ) -> std::io::Result<Self> {
59 <RawData as crate::util::AsyncInlinable>::read_inlined(reader)
60 .await
61 .map(RawMeta)
62 }
63
64 async fn write_inlined<W: tokio::io::AsyncWrite + Send + Unpin>(
65 &self,
66 wtr: &mut W,
67 ) -> std::io::Result<usize> {
68 crate::util::AsyncInlinable::write_inlined(self.deref(), wtr).await
69 }
70}
71
72#[derive(Debug, Deserialize, Serialize, Clone)]
163#[serde(rename_all = "camelCase")]
164pub struct FieldMore {
165 #[serde(flatten)]
166 field: Field,
167 #[serde(default)]
168 is_primary_key: bool,
169 #[serde(default, flatten)]
170 compression: Option<CompressOptions>,
171}
172
173impl From<Field> for FieldMore {
174 fn from(f: Field) -> Self {
175 Self {
176 field: f,
177 is_primary_key: false,
178 compression: None,
179 }
180 }
181}
182
183impl FieldMore {
184 pub fn is_primary_key(&self) -> bool {
185 self.is_primary_key
186 }
187
188 fn sql_repr(&self) -> String {
189 let mut sql = self.field.to_string();
190 if let Some(compression) = &self.compression {
191 sql.push(' ');
192 write!(&mut sql, "{}", compression).unwrap();
193 }
195 if self.is_primary_key {
196 sql.push_str(" PRIMARY KEY");
197 }
198 sql
199 }
200}
201#[derive(Debug, Deserialize, Serialize, Clone)]
202pub struct TagWithValue {
203 #[serde(flatten)]
204 pub field: Field,
205 pub value: serde_json::Value,
206}
207
208#[derive(Debug, Deserialize, Serialize, Clone)]
209#[serde(tag = "tableType")]
210#[serde(rename_all = "camelCase")]
211pub enum MetaCreate {
212 #[serde(rename_all = "camelCase")]
213 Super {
214 table_name: String,
215 columns: Vec<FieldMore>,
216 tags: Vec<Field>,
217 },
218 #[serde(rename_all = "camelCase")]
219 Child {
220 table_name: String,
221 using: String,
222 tags: Vec<TagWithValue>,
223 tag_num: Option<usize>,
224 },
225 #[serde(rename_all = "camelCase")]
226 Normal {
227 table_name: String,
228 columns: Vec<FieldMore>,
229 },
230}
231
232impl Display for MetaCreate {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.write_str("CREATE TABLE IF NOT EXISTS ")?;
235 match self {
236 MetaCreate::Super {
237 table_name,
238 columns,
239 tags,
240 } => {
241 debug_assert!(!columns.is_empty(), "{:?}", self);
242 debug_assert!(!tags.is_empty());
243
244 f.write_fmt(format_args!("`{}`", table_name))?;
245 f.write_char('(')?;
246 f.write_str(&columns.iter().map(|f| f.sql_repr()).join(", "))?;
247 f.write_char(')')?;
248
249 f.write_str(" TAGS(")?;
250 f.write_str(&tags.iter().map(|f| f.sql_repr()).join(", "))?;
251 f.write_char(')')?;
252 }
253 MetaCreate::Child {
254 table_name,
255 using,
256 tags,
257 tag_num,
258 } => {
259 if !tags.is_empty() {
260 f.write_fmt(format_args!(
261 "`{}` USING `{}` ({}) TAGS({})",
262 table_name,
263 using,
264 tags.iter().map(|t| t.field.escaped_name()).join(", "),
265 tags.iter()
266 .map(|t| {
267 match t.field.ty() {
268 Ty::Json => format!("'{}'", t.value.as_str().unwrap()),
269 Ty::VarChar | Ty::NChar => {
270 format!("{}", t.value.as_str().unwrap())
271 }
272 _ => format!("{}", t.value),
273 }
274 })
275 .join(", ")
276 ))?;
277 } else {
278 f.write_fmt(format_args!(
279 "`{}` USING `{}` TAGS({})",
280 table_name,
281 using,
282 std::iter::repeat("NULL").take(tag_num.unwrap()).join(",")
283 ))?;
284 }
285 }
286 MetaCreate::Normal {
287 table_name,
288 columns,
289 } => {
290 debug_assert!(!columns.is_empty());
291
292 f.write_fmt(format_args!("`{}`", table_name))?;
293 f.write_char('(')?;
294 f.write_str(&columns.iter().map(|f| f.sql_repr()).join(", "))?;
295 f.write_char(')')?;
296 }
297 }
298 Ok(())
299 }
300}
301
302#[test]
303fn test_meta_create_to_sql() {
304 }
321
322#[derive(Debug, Deserialize_repr, Serialize_repr, Clone, Copy)]
323#[repr(u8)]
324pub enum AlterType {
325 AddTag = 1,
326 DropTag = 2,
327 RenameTag = 3,
328 SetTagValue = 4,
329 AddColumn = 5,
330 DropColumn = 6,
331 ModifyColumnLength = 7,
332 ModifyTagLength = 8,
333 ModifyTableOption = 9,
334 RenameColumn = 10,
335 }
338
339#[derive(Debug, Deserialize, Serialize, Clone)]
340#[serde(rename_all = "camelCase")]
342pub struct MetaAlter {
343 pub table_name: String,
344 pub alter_type: AlterType,
345 #[serde(flatten, with = "ColField")]
346 pub field: Field,
347 pub col_new_name: Option<String>,
348 pub col_value: Option<String>,
349 pub col_value_null: Option<bool>,
350}
351
352impl Display for MetaAlter {
353 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354 match self.alter_type {
355 AlterType::AddTag => f.write_fmt(format_args!(
356 "ALTER TABLE `{}` ADD TAG {}",
357 self.table_name,
358 self.field.sql_repr()
359 )),
360 AlterType::DropTag => f.write_fmt(format_args!(
361 "ALTER TABLE `{}` DROP TAG `{}`",
362 self.table_name,
363 self.field.name()
364 )),
365 AlterType::RenameTag => f.write_fmt(format_args!(
366 "ALTER TABLE `{}` RENAME TAG `{}` `{}`",
367 self.table_name,
368 self.field.name(),
369 self.col_new_name.as_ref().unwrap()
370 )),
371 AlterType::SetTagValue => {
372 f.write_fmt(format_args!(
373 "ALTER TABLE `{}` SET TAG `{}` ",
374 self.table_name,
375 self.field.name()
376 ))?;
377 if self.col_value_null.unwrap_or(false) {
378 f.write_str("NULL")
379 } else if self.field.ty.is_var_type() {
380 f.write_fmt(format_args!("'{}'", self.col_value.as_ref().unwrap()))
381 } else {
382 f.write_fmt(format_args!("{}", self.col_value.as_ref().unwrap()))
383 }
384 }
385 AlterType::AddColumn => f.write_fmt(format_args!(
386 "ALTER TABLE `{}` ADD COLUMN {}",
387 self.table_name,
388 self.field.sql_repr()
389 )),
390 AlterType::DropColumn => f.write_fmt(format_args!(
391 "ALTER TABLE `{}` DROP COLUMN `{}`",
392 self.table_name,
393 self.field.name()
394 )),
395 AlterType::ModifyColumnLength => f.write_fmt(format_args!(
396 "ALTER TABLE `{}` MODIFY COLUMN {}",
397 self.table_name,
398 self.field.sql_repr(),
399 )),
400 AlterType::ModifyTagLength => f.write_fmt(format_args!(
401 "ALTER TABLE `{}` MODIFY TAG {}",
402 self.table_name,
403 self.field.sql_repr(),
404 )),
405 AlterType::ModifyTableOption => todo!(),
406 AlterType::RenameColumn => f.write_fmt(format_args!(
407 "ALTER TABLE `{}` RENAME COLUMN `{}` `{}`",
408 self.table_name,
409 self.field.name(),
410 self.col_new_name.as_ref().unwrap()
411 )),
412 }
418 }
419}
420
421#[derive(Debug, Deserialize, Serialize, Clone)]
422#[serde(rename_all = "camelCase")]
423#[serde(untagged)]
424pub enum MetaDrop {
425 #[serde(rename_all = "camelCase")]
426 Super {
427 table_name: String,
429 },
430 #[serde(rename_all = "camelCase")]
431 Other {
432 table_name_list: Vec<String>,
434 },
435}
436
437impl Display for MetaDrop {
438 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439 match self {
440 MetaDrop::Super { table_name } => {
441 f.write_fmt(format_args!("DROP TABLE IF EXISTS `{}`", table_name))
442 }
443 MetaDrop::Other { table_name_list } => f.write_fmt(format_args!(
444 "DROP TABLE IF EXISTS {}",
445 table_name_list.iter().map(|n| format!("`{n}`")).join(" ")
446 )),
447 }
448 }
449}
450
451#[derive(Debug, Deserialize, Serialize, Clone)]
452pub struct MetaDelete {
453 sql: String,
454}
455
456impl Display for MetaDelete {
457 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
458 f.write_str(&self.sql)
459 }
460}
461#[derive(Debug, Deserialize, Serialize, Clone)]
462#[serde(tag = "type")]
463#[serde(rename_all = "camelCase")]
464pub enum MetaUnit {
465 Create(MetaCreate),
466 Alter(MetaAlter),
467 Drop(MetaDrop),
468 Delete(MetaDelete),
469}
470
471impl Display for MetaUnit {
472 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473 match self {
474 MetaUnit::Create(meta) => meta.fmt(f),
475 MetaUnit::Alter(alter) => alter.fmt(f),
476 MetaUnit::Drop(drop) => drop.fmt(f),
477 MetaUnit::Delete(delete) => delete.fmt(f),
478 }
480 }
481}
482
483#[derive(Debug, Deserialize, Serialize, Clone)]
484#[serde(remote = "Field")]
485pub struct ColField {
486 #[serde(rename = "colName")]
487 name: String,
488 #[serde(default)]
489 #[serde(rename = "colType")]
490 ty: Ty,
491 #[serde(default)]
492 #[serde(rename = "colLength")]
493 bytes: u32,
494}
495
496impl From<ColField> for Field {
497 fn from(f: ColField) -> Self {
498 Self {
499 name: f.name,
500 ty: f.ty,
501 bytes: f.bytes,
502 }
503 }
504}
505
506#[derive(Debug, Deserialize, Serialize, Clone)]
507#[serde(untagged)]
508pub enum JsonMeta {
509 Plural {
510 tmq_meta_version: faststr::FastStr,
511 metas: Vec<MetaUnit>,
512 },
513 Single(MetaUnit),
514}
515
516impl JsonMeta {
517 pub fn is_single(&self) -> bool {
519 matches!(self, JsonMeta::Single { .. })
520 }
521
522 pub fn is_plural(&self) -> bool {
524 matches!(self, JsonMeta::Plural { .. })
525 }
526
527 pub fn iter(&self) -> JsonMetaIter {
528 match self {
529 JsonMeta::Plural { metas, .. } => JsonMetaIter {
530 iter: Either::Left(metas.iter()),
531 },
532 JsonMeta::Single(meta) => JsonMetaIter {
533 iter: Either::Right(std::iter::once(meta)),
534 },
535 }
536 }
537
538 pub fn iter_mut(&mut self) -> JsonMetaIterMut {
539 match self {
540 JsonMeta::Plural { metas, .. } => JsonMetaIterMut {
541 iter: Either::Left(metas.iter_mut()),
542 },
543 JsonMeta::Single(meta) => JsonMetaIterMut {
544 iter: Either::Right(std::iter::once(meta)),
545 },
546 }
547 }
548}
549
550pub struct JsonMetaIter<'a> {
551 iter: Either<std::slice::Iter<'a, MetaUnit>, std::iter::Once<&'a MetaUnit>>,
552}
553
554impl<'a> Iterator for JsonMetaIter<'a> {
555 type Item = &'a MetaUnit;
556
557 #[inline]
558 fn next(&mut self) -> Option<Self::Item> {
559 match &mut self.iter {
560 Either::Left(iter) => iter.next(),
561 Either::Right(iter) => iter.next(),
562 }
563 }
564}
565
566impl<'a> ExactSizeIterator for JsonMetaIter<'a> {
567 #[inline]
568 fn len(&self) -> usize {
569 match &self.iter {
570 Either::Left(iter) => iter.len(),
571 Either::Right(iter) => iter.len(),
572 }
573 }
574}
575
576impl<'a> DoubleEndedIterator for JsonMetaIter<'a> {
577 #[inline]
578 fn next_back(&mut self) -> Option<Self::Item> {
579 match &mut self.iter {
580 Either::Left(iter) => iter.next_back(),
581 Either::Right(iter) => iter.next_back(),
582 }
583 }
584}
585
586pub struct JsonMetaIterMut<'a> {
587 iter: Either<std::slice::IterMut<'a, MetaUnit>, std::iter::Once<&'a mut MetaUnit>>,
588}
589
590impl<'a> Iterator for JsonMetaIterMut<'a> {
591 type Item = &'a mut MetaUnit;
592
593 #[inline]
594 fn next(&mut self) -> Option<Self::Item> {
595 match &mut self.iter {
596 Either::Left(iter) => iter.next(),
597 Either::Right(iter) => iter.next(),
598 }
599 }
600}
601
602impl<'a> ExactSizeIterator for JsonMetaIterMut<'a> {
603 #[inline]
604 fn len(&self) -> usize {
605 match &self.iter {
606 Either::Left(iter) => iter.len(),
607 Either::Right(iter) => iter.len(),
608 }
609 }
610}
611
612impl<'a> DoubleEndedIterator for JsonMetaIterMut<'a> {
613 #[inline]
614 fn next_back(&mut self) -> Option<Self::Item> {
615 match &mut self.iter {
616 Either::Left(iter) => iter.next_back(),
617 Either::Right(iter) => iter.next_back(),
618 }
619 }
620}
621
622pub struct JsonMetaIntoIter(Either<std::vec::IntoIter<MetaUnit>, std::iter::Once<MetaUnit>>);
623
624impl Iterator for JsonMetaIntoIter {
625 type Item = MetaUnit;
626
627 fn next(&mut self) -> Option<Self::Item> {
628 match &mut self.0 {
629 Either::Left(iter) => iter.next(),
630 Either::Right(iter) => iter.next(),
631 }
632 }
633}
634
635impl ExactSizeIterator for JsonMetaIntoIter {
636 fn len(&self) -> usize {
637 match &self.0 {
638 Either::Left(iter) => iter.len(),
639 Either::Right(_) => 1,
640 }
641 }
642}
643
644impl DoubleEndedIterator for JsonMetaIntoIter {
645 fn next_back(&mut self) -> Option<Self::Item> {
646 match &mut self.0 {
647 Either::Left(iter) => iter.next_back(),
648 Either::Right(iter) => iter.next(),
649 }
650 }
651}
652
653impl IntoIterator for JsonMeta {
654 type Item = MetaUnit;
655 type IntoIter = JsonMetaIntoIter;
656
657 fn into_iter(self) -> Self::IntoIter {
658 match self {
659 JsonMeta::Plural { metas, .. } => JsonMetaIntoIter(Either::Left(metas.into_iter())),
660 JsonMeta::Single(meta) => JsonMetaIntoIter(Either::Right(std::iter::once(meta))),
661 }
662 }
663}
664
665impl<'a> IntoIterator for &'a JsonMeta {
666 type Item = &'a MetaUnit;
667 type IntoIter = JsonMetaIter<'a>;
668
669 fn into_iter(self) -> Self::IntoIter {
670 self.iter()
671 }
672}
673impl<'a> IntoIterator for &'a mut JsonMeta {
674 type Item = &'a mut MetaUnit;
675 type IntoIter = JsonMetaIterMut<'a>;
676
677 fn into_iter(self) -> Self::IntoIter {
678 self.iter_mut()
679 }
680}
681
682#[test]
683fn test_json_meta_compress() {
684 let json ="{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"t3\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"obj_id\",\"type\":5,\"isPrimarykey\":true,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"data1\",\"type\":6,\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"data2\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"high\"}],\"tags\":[]}";
685 let meta = serde_json::from_str::<MetaUnit>(json).unwrap();
686 println!("{}", meta);
687 assert_eq!(meta.to_string(), "CREATE TABLE IF NOT EXISTS `t3`(`ts` TIMESTAMP ENCODE 'delta-i' COMPRESS 'lz4' LEVEL 'medium', `obj_id` BIGINT ENCODE 'delta-i' COMPRESS 'lz4' LEVEL 'medium', `data1` FLOAT ENCODE 'delta-d' COMPRESS 'lz4' LEVEL 'medium', `data2` INT ENCODE 'simple8b' COMPRESS 'lz4' LEVEL 'high')");
688}
689
690#[test]
691fn test_json_meta_plural() {
692 let json ="{\"tmq_meta_version\":\"1.0\",\"metas\":[{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"t3\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"obj_id\",\"type\":5,\"isPrimarykey\":true,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"data1\",\"type\":6,\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"data2\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"high\"}],\"tags\":[]}] }";
693 let meta = serde_json::from_str::<JsonMeta>(json).unwrap();
694 println!("{:?}", meta);
695 assert_eq!(meta.iter().map(ToString::to_string).next().unwrap(), "CREATE TABLE IF NOT EXISTS `t3`(`ts` TIMESTAMP ENCODE 'delta-i' COMPRESS 'lz4' LEVEL 'medium', `obj_id` BIGINT ENCODE 'delta-i' COMPRESS 'lz4' LEVEL 'medium', `data1` FLOAT ENCODE 'delta-d' COMPRESS 'lz4' LEVEL 'medium', `data2` INT ENCODE 'simple8b' COMPRESS 'lz4' LEVEL 'high')");
696}