Skip to main content

qraft_core/
quex_exec.rs

1use std::{collections::HashSet, hash::Hash};
2
3use quex::{Driver, Executor, ParamRef, ParamSource, PreparedStatement, RowStream};
4
5use crate::{
6    AttachBelongsToMany, AttachMorphToMany, MySql, Postgres, Query, Sqlite, SyncBelongsToMany,
7    SyncMorphToMany, SyncResult, SyncWithoutDetachingBelongsToMany,
8    SyncWithoutDetachingMorphToMany, SyncWithoutDetachingResult, ToggleBelongsToMany,
9    ToggleMorphToMany, ToggleResult,
10    aggregate::count,
11    alias::Alias,
12    MorphToManyValueSpec, build_many_to_many_values, build_morph_to_many_values,
13    builder::{Delete, Insert, ReturningInsert, ReturningUpdate, Update},
14    insert_into,
15    lower::LowerCtx,
16    param::Param,
17    query::{
18        Compiled, LockedQuery, LockedQueryOf, LowerProject, QueryOf, TypedCompiled, select, star,
19    },
20    raw::{Raw, RawQuery, RawScalar},
21    span::{Span, TextSource, TextSpan},
22};
23
24struct CompiledParams<'a> {
25    params: &'a [Param],
26    data: &'a [u8],
27}
28
29impl CompiledParams<'_> {
30    fn blob(&self, span: Span) -> &[u8] {
31        &self.data[span.start as usize..(span.start + span.len) as usize]
32    }
33
34    fn text(&self, span: TextSpan) -> &str {
35        match span.0 {
36            TextSource::StaticText(value) => value,
37            TextSource::Text(span) => {
38                let bytes = self.blob(span);
39                unsafe { std::str::from_utf8_unchecked(bytes) }
40            }
41        }
42    }
43}
44
45impl ParamSource for CompiledParams<'_> {
46    fn len(&self) -> usize {
47        self.params.len()
48    }
49
50    fn value_at(&self, index: usize) -> ParamRef<'_> {
51        match self.params[index] {
52            Param::Null => ParamRef::Null,
53            Param::Bool(Some(value)) => ParamRef::I64(i64::from(value)),
54            Param::Bool(None) => ParamRef::Null,
55            Param::Float(Some(value)) => ParamRef::F64(f64::from(value)),
56            Param::Float(None) => ParamRef::Null,
57            Param::Double(Some(value)) => ParamRef::F64(value),
58            Param::Double(None) => ParamRef::Null,
59            Param::Int(Some(value)) => ParamRef::I64(i64::from(value)),
60            Param::Int(None) => ParamRef::Null,
61            Param::UInt(Some(value)) => ParamRef::U64(u64::from(value)),
62            Param::UInt(None) => ParamRef::Null,
63            Param::UBigInt(Some(value)) => ParamRef::U64(value),
64            Param::UBigInt(None) => ParamRef::Null,
65            Param::BigInt(Some(value)) => ParamRef::I64(value),
66            Param::BigInt(None) => ParamRef::Null,
67            Param::Text(Some(span)) => ParamRef::Str(self.text(span)),
68            Param::Text(None) => ParamRef::Null,
69            Param::Blob(Some(span)) => ParamRef::Bytes(self.blob(span)),
70            Param::Blob(None) => ParamRef::Null,
71        }
72    }
73}
74
75fn compiled_params(compiled: &Compiled) -> CompiledParams<'_> {
76    CompiledParams {
77        params: &compiled.params,
78        data: compiled.data.as_slice(),
79    }
80}
81
82async fn compiled_fetch<'e, E>(compiled: Compiled, exec: &'e mut E) -> quex::Result<E::Rows<'e>>
83where
84    E: Executor + ?Sized,
85{
86    if compiled.params.is_empty() {
87        exec.query(&compiled.sql).await
88    } else {
89        let params = compiled_params(&compiled);
90        exec.query_prepared_source(&compiled.sql, &params).await
91    }
92}
93
94async fn compiled_execute<E>(compiled: Compiled, mut exec: E) -> quex::Result<quex::ExecResult>
95where
96    E: Executor,
97{
98    let mut stmt = exec.prepare(&compiled.sql).await?;
99    let params = compiled_params(&compiled);
100    stmt.exec_source(&params).await
101}
102
103async fn compiled_all<T, E>(compiled: Compiled, mut exec: E) -> quex::Result<Vec<T>>
104where
105    T: quex::FromRow,
106    E: Executor,
107{
108    let mut rows = if compiled.params.is_empty() {
109        exec.query(&compiled.sql).await?
110    } else {
111        let params = compiled_params(&compiled);
112        exec.query_prepared_source(&compiled.sql, &params).await?
113    };
114    let mut out = Vec::new();
115    while let Some(row) = rows.next().await? {
116        out.push(row.decode::<T>()?);
117    }
118    Ok(out)
119}
120
121async fn compiled_one<T, E>(compiled: Compiled, mut exec: E) -> quex::Result<T>
122where
123    T: quex::FromRow,
124    E: Executor,
125{
126    let mut rows = if compiled.params.is_empty() {
127        exec.query(&compiled.sql).await?
128    } else {
129        let params = compiled_params(&compiled);
130        exec.query_prepared_source(&compiled.sql, &params).await?
131    };
132    match rows.next().await? {
133        Some(row) => Ok(row.decode::<T>()?),
134        None => Err(quex::Error::Unsupported("query returned no rows".into())),
135    }
136}
137
138async fn compiled_optional<T, E>(compiled: Compiled, mut exec: E) -> quex::Result<Option<T>>
139where
140    T: quex::FromRow,
141    E: Executor,
142{
143    let mut rows = if compiled.params.is_empty() {
144        exec.query(&compiled.sql).await?
145    } else {
146        let params = compiled_params(&compiled);
147        exec.query_prepared_source(&compiled.sql, &params).await?
148    };
149    match rows.next().await? {
150        Some(row) => Ok(Some(row.decode::<T>()?)),
151        None => Ok(None),
152    }
153}
154
155fn can_direct_count(query: &Query) -> bool {
156    !query.distinct
157        && query.group_by.is_empty()
158        && query.havings.is_empty()
159        && query.limit.is_empty()
160        && query.offset.is_empty()
161        && query.lock.is_none()
162        && query.compound.is_none()
163}
164
165fn into_count_query(query: Query) -> Query {
166    if !can_direct_count(&query) {
167        return select(count(star()).alias("aggregate")).from(query.alias("__qraft_count"));
168    }
169
170    let Query {
171        from,
172        filters,
173        ctes,
174        params,
175        data,
176        ..
177    } = query;
178
179    let mut count_query = Query::default();
180    count_query.from = from;
181    count_query.filters = filters;
182    count_query.ctes = ctes;
183    count_query.params = params;
184    count_query.data = data;
185
186    let mut ctx = LowerCtx {
187        instrs: &mut count_query.project,
188        params: &mut count_query.params,
189        data: &mut count_query.data,
190    };
191    count(star()).alias("aggregate").lower_project(&mut ctx);
192
193    count_query
194}
195
196async fn count_query<E>(query: Query, exec: E) -> quex::Result<i64>
197where
198    E: Executor,
199{
200    into_count_query(query).one(exec).await
201}
202
203impl Compiled {
204    pub async fn fetch<'e, E>(self, exec: &'e mut E) -> quex::Result<E::Rows<'e>>
205    where
206        E: Executor + ?Sized,
207    {
208        compiled_fetch(self, exec).await
209    }
210
211    pub async fn execute<E>(self, exec: E) -> quex::Result<quex::ExecResult>
212    where
213        E: Executor,
214    {
215        compiled_execute(self, exec).await
216    }
217}
218
219impl<T> TypedCompiled<T> {
220    pub async fn fetch<'e, E>(self, exec: &'e mut E) -> quex::Result<E::Rows<'e>>
221    where
222        T: quex::FromRow,
223        E: Executor + ?Sized,
224    {
225        compiled_fetch(
226            Compiled {
227                sql: self.sql,
228                params: self.params,
229                data: self.data,
230            },
231            exec,
232        )
233        .await
234    }
235
236    pub async fn execute<E>(self, exec: E) -> quex::Result<quex::ExecResult>
237    where
238        E: Executor,
239    {
240        compiled_execute(
241            Compiled {
242                sql: self.sql,
243                params: self.params,
244                data: self.data,
245            },
246            exec,
247        )
248        .await
249    }
250}
251
252impl<T> TypedCompiled<T>
253where
254    T: quex::FromRow,
255{
256    pub async fn all<E>(self, exec: E) -> quex::Result<Vec<T>>
257    where
258        E: Executor,
259    {
260        compiled_all(
261            Compiled {
262                sql: self.sql,
263                params: self.params,
264                data: self.data,
265            },
266            exec,
267        )
268        .await
269    }
270
271    pub async fn one<E>(self, exec: E) -> quex::Result<T>
272    where
273        E: Executor,
274    {
275        compiled_one(
276            Compiled {
277                sql: self.sql,
278                params: self.params,
279                data: self.data,
280            },
281            exec,
282        )
283        .await
284    }
285
286    pub async fn optional<E>(self, exec: E) -> quex::Result<Option<T>>
287    where
288        E: Executor,
289    {
290        compiled_optional(
291            Compiled {
292                sql: self.sql,
293                params: self.params,
294                data: self.data,
295            },
296            exec,
297        )
298        .await
299    }
300}
301
302macro_rules! impl_runtime_typed_query {
303    ($ty:ident$(<$m:ident>)?) => {
304        impl$(<$m>)? $ty$(<$m>)?
305        where
306            $($m: quex::FromRow,)?
307        {
308            pub async fn fetch<'e, E>(self, exec: &'e mut E) -> quex::Result<E::Rows<'e>>
309            where
310                E: Executor + ?Sized,
311            {
312                match exec.driver() {
313                    Driver::Pgsql => self.into_compiled::<Postgres>().fetch(exec).await,
314                    Driver::Sqlite => self.into_compiled::<Sqlite>().fetch(exec).await,
315                    Driver::Mysql => self.into_compiled::<MySql>().fetch(exec).await,
316                }
317            }
318
319            pub async fn all(self, exec: impl Executor) -> quex::Result<Vec<$($m)?>>
320            {
321                let driver = exec.driver();
322                match driver {
323                    Driver::Pgsql => self.into_compiled::<Postgres>().all(exec).await,
324                    Driver::Sqlite => self.into_compiled::<Sqlite>().all(exec).await,
325                    Driver::Mysql => self.into_compiled::<MySql>().all(exec).await,
326                }
327            }
328
329            pub async fn one(self, exec: impl Executor) -> quex::Result<$($m)?>
330            {
331                let driver = exec.driver();
332                match driver {
333                    Driver::Pgsql => self.into_compiled::<Postgres>().one(exec).await,
334                    Driver::Sqlite => self.into_compiled::<Sqlite>().one(exec).await,
335                    Driver::Mysql => self.into_compiled::<MySql>().one(exec).await,
336                }
337            }
338
339            pub async fn optional(self, exec: impl Executor) -> quex::Result<Option<$($m)?>>
340            {
341                let driver = exec.driver();
342                match driver {
343                    Driver::Pgsql => self.into_compiled::<Postgres>().optional(exec).await,
344                    Driver::Sqlite => self.into_compiled::<Sqlite>().optional(exec).await,
345                    Driver::Mysql => self.into_compiled::<MySql>().optional(exec).await,
346                }
347            }
348        }
349    };
350}
351
352impl_runtime_typed_query!(QueryOf<M>);
353impl_runtime_typed_query!(LockedQueryOf<M>);
354
355impl Query {
356    pub async fn fetch<'e, E>(self, exec: &'e mut E) -> quex::Result<E::Rows<'e>>
357    where
358        E: Executor + ?Sized,
359    {
360        match exec.driver() {
361            Driver::Pgsql => self.into_compiled::<Postgres>().fetch(exec).await,
362            Driver::Sqlite => self.into_compiled::<Sqlite>().fetch(exec).await,
363            Driver::Mysql => self.into_compiled::<MySql>().fetch(exec).await,
364        }
365    }
366
367    pub async fn all<T>(self, exec: impl Executor) -> quex::Result<Vec<T>>
368    where
369        T: quex::FromRow,
370    {
371        let driver = exec.driver();
372        match driver {
373            Driver::Pgsql => {
374                self.typed::<T>()
375                    .into_compiled::<Postgres>()
376                    .all(exec)
377                    .await
378            }
379            Driver::Sqlite => self.typed::<T>().into_compiled::<Sqlite>().all(exec).await,
380            Driver::Mysql => self.typed::<T>().into_compiled::<MySql>().all(exec).await,
381        }
382    }
383
384    pub async fn one<T>(self, exec: impl Executor) -> quex::Result<T>
385    where
386        T: quex::FromRow,
387    {
388        let driver = exec.driver();
389        match driver {
390            Driver::Pgsql => {
391                self.typed::<T>()
392                    .into_compiled::<Postgres>()
393                    .one(exec)
394                    .await
395            }
396            Driver::Sqlite => self.typed::<T>().into_compiled::<Sqlite>().one(exec).await,
397            Driver::Mysql => self.typed::<T>().into_compiled::<MySql>().one(exec).await,
398        }
399    }
400
401    pub async fn optional<T>(self, exec: impl Executor) -> quex::Result<Option<T>>
402    where
403        T: quex::FromRow,
404    {
405        let driver = exec.driver();
406        match driver {
407            Driver::Pgsql => {
408                self.typed::<T>()
409                    .into_compiled::<Postgres>()
410                    .optional(exec)
411                    .await
412            }
413            Driver::Sqlite => {
414                self.typed::<T>()
415                    .into_compiled::<Sqlite>()
416                    .optional(exec)
417                    .await
418            }
419            Driver::Mysql => {
420                self.typed::<T>()
421                    .into_compiled::<MySql>()
422                    .optional(exec)
423                    .await
424            }
425        }
426    }
427
428    pub async fn count<E>(self, exec: E) -> quex::Result<i64>
429    where
430        E: Executor,
431    {
432        count_query(self, exec).await
433    }
434}
435
436impl LockedQuery {
437    pub async fn fetch<'e, E>(self, exec: &'e mut E) -> quex::Result<E::Rows<'e>>
438    where
439        E: Executor + ?Sized,
440    {
441        match exec.driver() {
442            Driver::Pgsql => self.into_compiled::<Postgres>().fetch(exec).await,
443            Driver::Sqlite => self.into_compiled::<Sqlite>().fetch(exec).await,
444            Driver::Mysql => self.into_compiled::<MySql>().fetch(exec).await,
445        }
446    }
447
448    pub async fn all<T>(self, exec: impl Executor) -> quex::Result<Vec<T>>
449    where
450        T: quex::FromRow,
451    {
452        let driver = exec.driver();
453        match driver {
454            Driver::Pgsql => {
455                self.typed::<T>()
456                    .into_compiled::<Postgres>()
457                    .all(exec)
458                    .await
459            }
460            Driver::Sqlite => self.typed::<T>().into_compiled::<Sqlite>().all(exec).await,
461            Driver::Mysql => self.typed::<T>().into_compiled::<MySql>().all(exec).await,
462        }
463    }
464
465    pub async fn one<T>(self, exec: impl Executor) -> quex::Result<T>
466    where
467        T: quex::FromRow,
468    {
469        let driver = exec.driver();
470        match driver {
471            Driver::Pgsql => {
472                self.typed::<T>()
473                    .into_compiled::<Postgres>()
474                    .one(exec)
475                    .await
476            }
477            Driver::Sqlite => self.typed::<T>().into_compiled::<Sqlite>().one(exec).await,
478            Driver::Mysql => self.typed::<T>().into_compiled::<MySql>().one(exec).await,
479        }
480    }
481
482    pub async fn optional<T>(self, exec: impl Executor) -> quex::Result<Option<T>>
483    where
484        T: quex::FromRow,
485    {
486        let driver = exec.driver();
487        match driver {
488            Driver::Pgsql => {
489                self.typed::<T>()
490                    .into_compiled::<Postgres>()
491                    .optional(exec)
492                    .await
493            }
494            Driver::Sqlite => {
495                self.typed::<T>()
496                    .into_compiled::<Sqlite>()
497                    .optional(exec)
498                    .await
499            }
500            Driver::Mysql => {
501                self.typed::<T>()
502                    .into_compiled::<MySql>()
503                    .optional(exec)
504                    .await
505            }
506        }
507    }
508
509    pub async fn count<E>(self, exec: E) -> quex::Result<i64>
510    where
511        E: Executor,
512    {
513        count_query(<Query as From<LockedQuery>>::from(self), exec).await
514    }
515}
516
517impl<M> Insert<M>
518where
519    M: crate::Qrafting,
520{
521    pub async fn execute<E>(self, exec: E) -> quex::Result<quex::ExecResult>
522    where
523        E: Executor,
524    {
525        let driver = exec.driver();
526        let sql = match driver {
527            Driver::Pgsql => self.to_sql::<Postgres>(),
528            Driver::Sqlite => self.to_sql::<Sqlite>(),
529            Driver::Mysql => self.to_sql::<MySql>(),
530        };
531
532        Compiled {
533            sql,
534            params: self.query.params,
535            data: self.query.data,
536        }
537        .execute(exec)
538        .await
539    }
540}
541
542impl<M, T> ReturningInsert<M, T>
543where
544    M: crate::Qrafting,
545    T: quex::FromRow,
546{
547    pub async fn fetch<'e, E>(self, exec: &'e mut E) -> quex::Result<E::Rows<'e>>
548    where
549        E: Executor + ?Sized,
550    {
551        match exec.driver() {
552            Driver::Pgsql => self.into_compiled::<Postgres>().fetch(exec).await,
553            Driver::Sqlite => self.into_compiled::<Sqlite>().fetch(exec).await,
554            Driver::Mysql => self.into_compiled::<MySql>().fetch(exec).await,
555        }
556    }
557
558    pub async fn all<R>(self, exec: impl Executor) -> quex::Result<Vec<R>>
559    where
560        R: quex::FromRow,
561    {
562        let driver = exec.driver();
563        match driver {
564            Driver::Pgsql => {
565                self.typed::<R>()
566                    .into_compiled::<Postgres>()
567                    .all(exec)
568                    .await
569            }
570            Driver::Sqlite => self.typed::<R>().into_compiled::<Sqlite>().all(exec).await,
571            Driver::Mysql => self.typed::<R>().into_compiled::<MySql>().all(exec).await,
572        }
573    }
574
575    pub async fn one<R>(self, exec: impl Executor) -> quex::Result<R>
576    where
577        R: quex::FromRow,
578    {
579        let driver = exec.driver();
580        match driver {
581            Driver::Pgsql => {
582                self.typed::<R>()
583                    .into_compiled::<Postgres>()
584                    .one(exec)
585                    .await
586            }
587            Driver::Sqlite => self.typed::<R>().into_compiled::<Sqlite>().one(exec).await,
588            Driver::Mysql => self.typed::<R>().into_compiled::<MySql>().one(exec).await,
589        }
590    }
591
592    pub async fn optional<R>(self, exec: impl Executor) -> quex::Result<Option<R>>
593    where
594        R: quex::FromRow,
595    {
596        let driver = exec.driver();
597        match driver {
598            Driver::Pgsql => {
599                self.typed::<R>()
600                    .into_compiled::<Postgres>()
601                    .optional(exec)
602                    .await
603            }
604            Driver::Sqlite => {
605                self.typed::<R>()
606                    .into_compiled::<Sqlite>()
607                    .optional(exec)
608                    .await
609            }
610            Driver::Mysql => {
611                self.typed::<R>()
612                    .into_compiled::<MySql>()
613                    .optional(exec)
614                    .await
615            }
616        }
617    }
618}
619
620impl<M> Update<M> {
621    pub async fn execute<E>(self, exec: E) -> quex::Result<quex::ExecResult>
622    where
623        E: Executor,
624    {
625        let driver = exec.driver();
626        match driver {
627            Driver::Pgsql => self.into_compiled::<Postgres>().execute(exec).await,
628            Driver::Sqlite => self.into_compiled::<Sqlite>().execute(exec).await,
629            Driver::Mysql => self.into_compiled::<MySql>().execute(exec).await,
630        }
631    }
632}
633
634impl<M, T> ReturningUpdate<M, T>
635where
636    T: quex::FromRow,
637{
638    pub async fn fetch<'e, E>(self, exec: &'e mut E) -> quex::Result<E::Rows<'e>>
639    where
640        E: Executor + ?Sized,
641    {
642        match exec.driver() {
643            Driver::Pgsql => self.into_compiled::<Postgres>().fetch(exec).await,
644            Driver::Sqlite => self.into_compiled::<Sqlite>().fetch(exec).await,
645            Driver::Mysql => self.into_compiled::<MySql>().fetch(exec).await,
646        }
647    }
648
649    pub async fn all<R>(self, exec: impl Executor) -> quex::Result<Vec<R>>
650    where
651        R: quex::FromRow,
652    {
653        let driver = exec.driver();
654        match driver {
655            Driver::Pgsql => {
656                self.typed::<R>()
657                    .into_compiled::<Postgres>()
658                    .all(exec)
659                    .await
660            }
661            Driver::Sqlite => self.typed::<R>().into_compiled::<Sqlite>().all(exec).await,
662            Driver::Mysql => self.typed::<R>().into_compiled::<MySql>().all(exec).await,
663        }
664    }
665
666    pub async fn one<R>(self, exec: impl Executor) -> quex::Result<R>
667    where
668        R: quex::FromRow,
669    {
670        let driver = exec.driver();
671        match driver {
672            Driver::Pgsql => {
673                self.typed::<R>()
674                    .into_compiled::<Postgres>()
675                    .one(exec)
676                    .await
677            }
678            Driver::Sqlite => self.typed::<R>().into_compiled::<Sqlite>().one(exec).await,
679            Driver::Mysql => self.typed::<R>().into_compiled::<MySql>().one(exec).await,
680        }
681    }
682
683    pub async fn optional<R>(self, exec: impl Executor) -> quex::Result<Option<R>>
684    where
685        R: quex::FromRow,
686    {
687        let driver = exec.driver();
688        match driver {
689            Driver::Pgsql => {
690                self.typed::<R>()
691                    .into_compiled::<Postgres>()
692                    .optional(exec)
693                    .await
694            }
695            Driver::Sqlite => {
696                self.typed::<R>()
697                    .into_compiled::<Sqlite>()
698                    .optional(exec)
699                    .await
700            }
701            Driver::Mysql => {
702                self.typed::<R>()
703                    .into_compiled::<MySql>()
704                    .optional(exec)
705                    .await
706            }
707        }
708    }
709}
710
711impl<T> Delete<T> {
712    pub async fn execute<E>(self, exec: E) -> quex::Result<quex::ExecResult>
713    where
714        E: Executor,
715    {
716        let driver = exec.driver();
717        match driver {
718            Driver::Pgsql => self.into_compiled::<Postgres>().execute(exec).await,
719            Driver::Sqlite => self.into_compiled::<Sqlite>().execute(exec).await,
720            Driver::Mysql => self.into_compiled::<MySql>().execute(exec).await,
721        }
722    }
723}
724
725impl Raw {
726    pub async fn execute(self, exec: impl Executor) -> quex::Result<quex::ExecResult> {
727        let driver = exec.driver();
728        match driver {
729            Driver::Pgsql => self.into_compiled().execute(exec).await,
730            Driver::Sqlite => self.into_compiled().execute(exec).await,
731            Driver::Mysql => self.into_compiled().execute(exec).await,
732        }
733    }
734}
735
736impl<T> RawQuery<T>
737where
738    T: quex::FromRow,
739{
740    pub async fn fetch<'e, E>(self, exec: &'e mut E) -> quex::Result<E::Rows<'e>>
741    where
742        E: Executor + ?Sized,
743    {
744        match exec.driver() {
745            Driver::Pgsql => self.into_compiled().fetch(exec).await,
746            Driver::Sqlite => self.into_compiled().fetch(exec).await,
747            Driver::Mysql => self.into_compiled().fetch(exec).await,
748        }
749    }
750
751    pub async fn all(self, exec: impl Executor) -> quex::Result<Vec<T>> {
752        let driver = exec.driver();
753        match driver {
754            Driver::Pgsql => self.into_compiled().all(exec).await,
755            Driver::Sqlite => self.into_compiled().all(exec).await,
756            Driver::Mysql => self.into_compiled().all(exec).await,
757        }
758    }
759
760    pub async fn one(self, exec: impl Executor) -> quex::Result<T> {
761        let driver = exec.driver();
762        match driver {
763            Driver::Pgsql => self.into_compiled().one(exec).await,
764            Driver::Sqlite => self.into_compiled().one(exec).await,
765            Driver::Mysql => self.into_compiled().one(exec).await,
766        }
767    }
768
769    pub async fn optional(self, exec: impl Executor) -> quex::Result<Option<T>> {
770        let driver = exec.driver();
771        match driver {
772            Driver::Pgsql => self.into_compiled().optional(exec).await,
773            Driver::Sqlite => self.into_compiled().optional(exec).await,
774            Driver::Mysql => self.into_compiled().optional(exec).await,
775        }
776    }
777}
778
779impl<T> RawScalar<T>
780where
781    T: crate::TypeMeta + crate::TypeCast,
782    <T as crate::TypeCast>::From: quex::FromRow,
783{
784    pub async fn fetch<'e, E>(self, exec: &'e mut E) -> quex::Result<E::Rows<'e>>
785    where
786        E: Executor + ?Sized,
787    {
788        match exec.driver() {
789            Driver::Pgsql => self.into_compiled().fetch(exec).await,
790            Driver::Sqlite => self.into_compiled().fetch(exec).await,
791            Driver::Mysql => self.into_compiled().fetch(exec).await,
792        }
793    }
794
795    pub async fn all(self, exec: impl Executor) -> quex::Result<Vec<<T as crate::TypeCast>::From>> {
796        let driver = exec.driver();
797        match driver {
798            Driver::Pgsql => self.into_compiled().all(exec).await,
799            Driver::Sqlite => self.into_compiled().all(exec).await,
800            Driver::Mysql => self.into_compiled().all(exec).await,
801        }
802    }
803
804    pub async fn one(self, exec: impl Executor) -> quex::Result<<T as crate::TypeCast>::From> {
805        let driver = exec.driver();
806        match driver {
807            Driver::Pgsql => self.into_compiled().one(exec).await,
808            Driver::Sqlite => self.into_compiled().one(exec).await,
809            Driver::Mysql => self.into_compiled().one(exec).await,
810        }
811    }
812
813    pub async fn optional(
814        self,
815        exec: impl Executor,
816    ) -> quex::Result<Option<<T as crate::TypeCast>::From>> {
817        let driver = exec.driver();
818        match driver {
819            Driver::Pgsql => self.into_compiled().optional(exec).await,
820            Driver::Sqlite => self.into_compiled().optional(exec).await,
821            Driver::Mysql => self.into_compiled().optional(exec).await,
822        }
823    }
824}
825
826impl<M> QueryOf<M>
827where
828    M: crate::Qrafting,
829{
830    pub async fn count<E>(self, exec: E) -> quex::Result<i64>
831    where
832        E: Executor,
833    {
834        count_query(self.into(), exec).await
835    }
836}
837
838impl<M> LockedQueryOf<M>
839where
840    M: crate::Qrafting + quex::FromRow,
841{
842    pub async fn count<E>(self, exec: E) -> quex::Result<i64>
843    where
844        E: Executor,
845    {
846        count_query(<Query as From<LockedQueryOf<M>>>::from(self), exec).await
847    }
848}
849
850impl<Pivot, ParentMeta, RelatedMeta, ParentKey, RelatedKey>
851    AttachBelongsToMany<Pivot, ParentMeta, RelatedMeta, ParentKey, RelatedKey>
852where
853    Pivot: crate::Qrafting + 'static,
854    ParentMeta: crate::TypeMeta + crate::Comparable + crate::Nullability,
855    RelatedMeta: crate::TypeMeta + crate::Comparable + crate::Nullability,
856    ParentKey: Clone + crate::Compatible<ParentMeta>,
857    RelatedKey: Clone + crate::Compatible<RelatedMeta>,
858    crate::BinaryType<crate::NullOf<ParentMeta>, crate::NullOf<ParentMeta>>: crate::PredicateType,
859    crate::BinaryType<crate::NullOf<RelatedMeta>, crate::NullOf<RelatedMeta>>: crate::PredicateType,
860{
861    pub async fn execute<E>(self, exec: E) -> quex::Result<u64>
862    where
863        E: Executor,
864    {
865        let AttachBelongsToMany {
866            pivot_table,
867            parent_column,
868            related_column,
869            parent_key,
870            related_keys,
871        } = self;
872        let attached = related_keys.len() as u64;
873        if attached > 0 {
874            let values =
875                build_many_to_many_values(parent_column, related_column, &parent_key, related_keys);
876            insert_into(pivot_table)
877                .values(values)
878                .no_returning()
879                .execute(exec)
880                .await?;
881        }
882        Ok(attached)
883    }
884}
885
886impl<Pivot, ParentMeta, RelatedMeta, ParentKey, RelatedKey>
887    AttachMorphToMany<Pivot, ParentMeta, RelatedMeta, ParentKey, RelatedKey>
888where
889    Pivot: crate::Qrafting + 'static,
890    ParentMeta: crate::TypeMeta + crate::Comparable + crate::Nullability,
891    RelatedMeta: crate::TypeMeta + crate::Comparable + crate::Nullability,
892    ParentKey: Clone + crate::Compatible<ParentMeta>,
893    RelatedKey: Clone + crate::Compatible<RelatedMeta>,
894    crate::BinaryType<crate::NullOf<ParentMeta>, crate::NullOf<ParentMeta>>: crate::PredicateType,
895    crate::BinaryType<crate::NullOf<RelatedMeta>, crate::NullOf<RelatedMeta>>: crate::PredicateType,
896{
897    pub async fn execute<E>(self, exec: E) -> quex::Result<u64>
898    where
899        E: Executor,
900    {
901        let AttachMorphToMany {
902            pivot_table,
903            parent_column,
904            type_column,
905            related_column,
906            parent_key,
907            morph_type,
908            related_keys,
909        } = self;
910        let attached = related_keys.len() as u64;
911        if attached > 0 {
912            let values = build_morph_to_many_values(MorphToManyValueSpec {
913                parent_column,
914                type_column,
915                related_column,
916                parent_key,
917                morph_type,
918                related_keys,
919            });
920            insert_into(pivot_table)
921                .values(values)
922                .no_returning()
923                .execute(exec)
924                .await?;
925        }
926        Ok(attached)
927    }
928}
929
930macro_rules! impl_sync_action {
931    ($ty:ident, $result:ty, $detach_iter:ident) => {
932        impl<Pivot, ParentMeta, RelatedMeta, ParentKey, RelatedKey>
933            $ty<Pivot, ParentMeta, RelatedMeta, ParentKey, RelatedKey>
934        where
935            Pivot: crate::Qrafting + 'static,
936            ParentMeta: crate::TypeMeta + crate::Comparable + crate::Nullability,
937            RelatedMeta: crate::TypeMeta + crate::Comparable + crate::Nullability,
938            ParentKey: Clone + crate::Compatible<ParentMeta>,
939            RelatedKey: Clone + Eq + Hash + quex::FromRow + crate::Compatible<RelatedMeta>,
940            crate::BinaryType<crate::NullOf<ParentMeta>, crate::NullOf<ParentMeta>>:
941                crate::PredicateType,
942            crate::BinaryType<crate::NullOf<RelatedMeta>, crate::NullOf<RelatedMeta>>:
943                crate::PredicateType,
944            crate::UnaryType<crate::NullOf<RelatedMeta>>: crate::PredicateType,
945        {
946            pub async fn execute<E>(self, exec: E) -> quex::Result<$result>
947            where
948                E: Executor + Clone,
949            {
950                let existing: Vec<RelatedKey> = self.existing_query().all(exec.clone()).await?;
951                let desired = self.desired_keys();
952
953                let mut existing_set = HashSet::with_capacity(existing.len());
954                existing_set.extend(existing);
955                let mut desired_set = HashSet::with_capacity(desired.len());
956                desired_set.extend(desired.iter().cloned());
957
958                let to_attach = desired_set
959                    .difference(&existing_set)
960                    .cloned()
961                    .collect::<Vec<_>>();
962                let attached = to_attach.len() as u64;
963                if !to_attach.is_empty() {
964                    self.attach_insert_many(to_attach)
965                        .execute(exec.clone())
966                        .await?;
967                }
968
969                let to_detach = $detach_iter(&desired_set, &existing_set)
970                    .cloned()
971                    .collect::<Vec<_>>();
972                let detached = if to_detach.is_empty() {
973                    0
974                } else {
975                    self.detach_query_many(to_detach)
976                        .execute(exec.clone())
977                        .await?
978                        .rows_affected
979                };
980
981                Ok(sync_action_result::<$result>(attached, detached))
982            }
983        }
984    };
985}
986
987fn existing_minus_desired<'a, T: Eq + Hash>(
988    desired: &'a HashSet<T>,
989    existing: &'a HashSet<T>,
990) -> std::collections::hash_set::Difference<'a, T, std::collections::hash_map::RandomState> {
991    existing.difference(desired)
992}
993
994fn desired_intersection_existing<'a, T: Eq + Hash>(
995    desired: &'a HashSet<T>,
996    existing: &'a HashSet<T>,
997) -> std::collections::hash_set::Intersection<'a, T, std::collections::hash_map::RandomState> {
998    desired.intersection(existing)
999}
1000
1001trait SyncActionResultCtor {
1002    fn new(attached: u64, detached: u64) -> Self;
1003}
1004
1005impl SyncActionResultCtor for SyncResult {
1006    fn new(attached: u64, detached: u64) -> Self {
1007        Self { attached, detached }
1008    }
1009}
1010
1011impl SyncActionResultCtor for ToggleResult {
1012    fn new(attached: u64, detached: u64) -> Self {
1013        Self { attached, detached }
1014    }
1015}
1016
1017fn sync_action_result<R: SyncActionResultCtor>(attached: u64, detached: u64) -> R {
1018    R::new(attached, detached)
1019}
1020
1021impl_sync_action!(SyncBelongsToMany, SyncResult, existing_minus_desired);
1022impl_sync_action!(SyncMorphToMany, SyncResult, existing_minus_desired);
1023impl_sync_action!(
1024    ToggleBelongsToMany,
1025    ToggleResult,
1026    desired_intersection_existing
1027);
1028impl_sync_action!(
1029    ToggleMorphToMany,
1030    ToggleResult,
1031    desired_intersection_existing
1032);
1033
1034macro_rules! impl_sync_without_detaching_action {
1035    ($ty:ident) => {
1036        impl<Pivot, ParentMeta, RelatedMeta, ParentKey, RelatedKey>
1037            $ty<Pivot, ParentMeta, RelatedMeta, ParentKey, RelatedKey>
1038        where
1039            Pivot: crate::Qrafting,
1040            ParentMeta: crate::TypeMeta + crate::Comparable + crate::Nullability,
1041            RelatedMeta: crate::TypeMeta + crate::Comparable + crate::Nullability,
1042            ParentKey: Clone + crate::Compatible<ParentMeta>,
1043            RelatedKey: Clone + Eq + Hash + quex::FromRow + crate::Compatible<RelatedMeta>,
1044            crate::BinaryType<crate::NullOf<ParentMeta>, crate::NullOf<ParentMeta>>:
1045                crate::PredicateType,
1046            crate::BinaryType<crate::NullOf<RelatedMeta>, crate::NullOf<RelatedMeta>>:
1047                crate::PredicateType,
1048            crate::UnaryType<crate::NullOf<RelatedMeta>>: crate::PredicateType,
1049        {
1050            pub async fn execute<E>(self, exec: E) -> quex::Result<SyncWithoutDetachingResult>
1051            where
1052                E: Executor + Clone,
1053            {
1054                let existing: Vec<RelatedKey> = self.existing_query().all(exec.clone()).await?;
1055                let desired = self.desired_keys();
1056
1057                let mut existing_set = HashSet::with_capacity(existing.len());
1058                existing_set.extend(existing);
1059
1060                let to_attach = desired
1061                    .iter()
1062                    .filter(|related_key| !existing_set.contains(*related_key))
1063                    .cloned()
1064                    .collect::<Vec<_>>();
1065                let attached = to_attach.len() as u64;
1066                if !to_attach.is_empty() {
1067                    self.attach_insert_many(to_attach).execute(exec).await?;
1068                }
1069
1070                Ok(SyncWithoutDetachingResult { attached })
1071            }
1072        }
1073    };
1074}
1075
1076impl_sync_without_detaching_action!(SyncWithoutDetachingBelongsToMany);
1077impl_sync_without_detaching_action!(SyncWithoutDetachingMorphToMany);